1use super::stages::{execute_executor_stage, find_implementation, is_executor_stage, StageFn};
2use super::{ExecutionError, StageExecutor};
3use noether_core::stage::StageId;
4use noether_store::StageStore;
5use serde_json::Value;
6use std::collections::HashMap;
7
8#[derive(Default)]
31pub struct InlineRegistry {
32 extra_fns: HashMap<String, StageFn>,
33}
34
35impl InlineRegistry {
36 pub fn new() -> Self {
39 Self {
40 extra_fns: HashMap::new(),
41 }
42 }
43
44 pub fn register(&mut self, description: impl Into<String>, f: StageFn) -> &mut Self {
50 self.extra_fns.insert(description.into(), f);
51 self
52 }
53
54 pub(crate) fn find(&self, description: &str) -> Option<StageFn> {
60 if let Some(&f) = self.extra_fns.get(description) {
61 return Some(f);
62 }
63 find_implementation(description)
64 }
65
66 pub fn len(&self) -> usize {
68 self.extra_fns.len()
69 }
70
71 pub fn is_empty(&self) -> bool {
73 self.extra_fns.is_empty()
74 }
75}
76
77pub struct InlineExecutor {
83 implementations: HashMap<String, StageFn>,
84 fallback_outputs: HashMap<String, Value>,
85 descriptions: HashMap<String, String>,
87}
88
89impl InlineExecutor {
90 pub fn from_store(store: &(impl StageStore + ?Sized)) -> Self {
95 Self::from_store_with_registry(store, InlineRegistry::new())
96 }
97
98 pub fn from_store_with_registry(
103 store: &(impl StageStore + ?Sized),
104 registry: InlineRegistry,
105 ) -> Self {
106 let mut implementations = HashMap::new();
107 let mut fallback_outputs = HashMap::new();
108 let mut descriptions = HashMap::new();
109
110 for stage in store.list(None) {
111 if let Some(func) = registry.find(&stage.description) {
112 implementations.insert(stage.id.0.clone(), func);
113 }
114 if let Some(example) = stage.examples.first() {
115 fallback_outputs.insert(stage.id.0.clone(), example.output.clone());
116 }
117 descriptions.insert(stage.id.0.clone(), stage.description.clone());
118 }
119
120 Self {
121 implementations,
122 fallback_outputs,
123 descriptions,
124 }
125 }
126
127 pub fn has_implementation(&self, stage_id: &StageId) -> bool {
129 self.implementations.contains_key(&stage_id.0)
130 || self.is_hof_stage(stage_id)
131 || self.is_csv_stage(stage_id)
132 || self.is_executor_hof(stage_id)
133 }
134
135 fn description_of(&self, stage_id: &StageId) -> Option<&str> {
136 self.descriptions.get(&stage_id.0).map(|s| s.as_str())
137 }
138
139 fn is_hof_stage(&self, stage_id: &StageId) -> bool {
140 matches!(
141 self.description_of(stage_id),
142 Some("Apply a stage to each element of a list")
143 | Some("Keep only elements where the predicate stage returns true")
144 | Some(
145 "Reduce a list to a single value by applying a stage to accumulator and each element"
146 )
147 )
148 }
149
150 fn is_csv_stage(&self, stage_id: &StageId) -> bool {
151 matches!(
152 self.description_of(stage_id),
153 Some("Parse CSV text into a list of row maps")
154 | Some("Serialize a list of row maps to CSV text")
155 )
156 }
157
158 fn is_executor_hof(&self, stage_id: &StageId) -> bool {
159 self.description_of(stage_id)
160 .map(is_executor_stage)
161 .unwrap_or(false)
162 }
163
164 fn execute_hof(&self, stage_id: &StageId, input: &Value) -> Result<Value, ExecutionError> {
165 let desc = self.description_of(stage_id).unwrap_or("");
166 match desc {
167 "Apply a stage to each element of a list" => self.execute_map(input),
168 "Keep only elements where the predicate stage returns true" => {
169 self.execute_filter(input)
170 }
171 "Reduce a list to a single value by applying a stage to accumulator and each element" => {
172 self.execute_reduce(input)
173 }
174 _ => unreachable!(),
175 }
176 }
177
178 fn execute_map(&self, input: &Value) -> Result<Value, ExecutionError> {
179 let items = input
180 .get("items")
181 .and_then(|v| v.as_array())
182 .ok_or_else(|| ExecutionError::StageFailed {
183 stage_id: StageId("map".into()),
184 message: "items must be an array".into(),
185 })?;
186 let child_id = input
187 .get("stage_id")
188 .and_then(|v| v.as_str())
189 .ok_or_else(|| ExecutionError::StageFailed {
190 stage_id: StageId("map".into()),
191 message: "stage_id must be a string".into(),
192 })?;
193 let child = StageId(child_id.into());
194
195 let mut results = Vec::with_capacity(items.len());
196 for item in items {
197 results.push(self.execute(&child, item)?);
198 }
199 Ok(Value::Array(results))
200 }
201
202 fn execute_filter(&self, input: &Value) -> Result<Value, ExecutionError> {
203 let items = input
204 .get("items")
205 .and_then(|v| v.as_array())
206 .ok_or_else(|| ExecutionError::StageFailed {
207 stage_id: StageId("filter".into()),
208 message: "items must be an array".into(),
209 })?;
210 let child_id = input
211 .get("stage_id")
212 .and_then(|v| v.as_str())
213 .ok_or_else(|| ExecutionError::StageFailed {
214 stage_id: StageId("filter".into()),
215 message: "stage_id must be a string".into(),
216 })?;
217 let child = StageId(child_id.into());
218
219 let mut results = Vec::new();
220 for item in items {
221 let predicate_result = self.execute(&child, item)?;
222 let keep = match &predicate_result {
223 Value::Bool(b) => *b,
224 _ => false,
225 };
226 if keep {
227 results.push(item.clone());
228 }
229 }
230 Ok(Value::Array(results))
231 }
232
233 fn execute_reduce(&self, input: &Value) -> Result<Value, ExecutionError> {
234 let items = input
235 .get("items")
236 .and_then(|v| v.as_array())
237 .ok_or_else(|| ExecutionError::StageFailed {
238 stage_id: StageId("reduce".into()),
239 message: "items must be an array".into(),
240 })?;
241 let child_id = input
242 .get("stage_id")
243 .and_then(|v| v.as_str())
244 .ok_or_else(|| ExecutionError::StageFailed {
245 stage_id: StageId("reduce".into()),
246 message: "stage_id must be a string".into(),
247 })?;
248 let initial = input.get("initial").cloned().unwrap_or(Value::Null);
249 let child = StageId(child_id.into());
250
251 let mut accumulator = initial;
252 for item in items {
253 let reducer_input = serde_json::json!({
254 "accumulator": accumulator,
255 "item": item,
256 });
257 accumulator = self.execute(&child, &reducer_input)?;
258 }
259 Ok(accumulator)
260 }
261
262 fn execute_csv(&self, stage_id: &StageId, input: &Value) -> Result<Value, ExecutionError> {
263 let desc = self.description_of(stage_id).unwrap_or("");
264 match desc {
265 "Parse CSV text into a list of row maps" => csv_parse(input),
266 "Serialize a list of row maps to CSV text" => csv_write(input),
267 _ => unreachable!(),
268 }
269 }
270}
271
272fn csv_parse(input: &Value) -> Result<Value, ExecutionError> {
273 let text =
274 input
275 .get("text")
276 .and_then(|v| v.as_str())
277 .ok_or_else(|| ExecutionError::StageFailed {
278 stage_id: StageId("csv_parse".into()),
279 message: "text must be a string".into(),
280 })?;
281 let has_header = input
282 .get("has_header")
283 .and_then(|v| v.as_bool())
284 .unwrap_or(true);
285 let delimiter = input
286 .get("delimiter")
287 .and_then(|v| v.as_str())
288 .unwrap_or(",");
289
290 let mut lines: Vec<&str> = text.lines().collect();
291 if lines.is_empty() {
292 return Ok(Value::Array(vec![]));
293 }
294
295 let headers: Vec<&str> = if has_header {
296 let header_line = lines.remove(0);
297 header_line.split(delimiter).collect()
298 } else {
299 let first = lines.first().unwrap_or(&"");
301 let count = first.split(delimiter).count();
302 (0..count)
303 .map(|i| Box::leak(format!("col{i}").into_boxed_str()) as &str)
304 .collect()
305 };
306
307 let mut rows = Vec::new();
308 for line in &lines {
309 if line.trim().is_empty() {
310 continue;
311 }
312 let values: Vec<&str> = line.split(delimiter).collect();
313 let mut row = serde_json::Map::new();
314 for (i, header) in headers.iter().enumerate() {
315 let val = values.get(i).unwrap_or(&"");
316 row.insert(header.to_string(), Value::String(val.to_string()));
317 }
318 rows.push(Value::Object(row));
319 }
320 Ok(Value::Array(rows))
321}
322
323fn csv_write(input: &Value) -> Result<Value, ExecutionError> {
324 let records = input
325 .get("records")
326 .and_then(|v| v.as_array())
327 .ok_or_else(|| ExecutionError::StageFailed {
328 stage_id: StageId("csv_write".into()),
329 message: "records must be an array".into(),
330 })?;
331 let delimiter = input
332 .get("delimiter")
333 .and_then(|v| v.as_str())
334 .unwrap_or(",");
335
336 if records.is_empty() {
337 return Ok(Value::String(String::new()));
338 }
339
340 let mut headers: Vec<String> = records
342 .first()
343 .and_then(|r| r.as_object())
344 .map(|obj| obj.keys().cloned().collect())
345 .unwrap_or_default();
346 headers.sort();
347
348 let mut lines = Vec::new();
349 lines.push(headers.join(delimiter));
351
352 for record in records {
354 if let Some(obj) = record.as_object() {
355 let values: Vec<String> = headers
356 .iter()
357 .map(|h| {
358 obj.get(h)
359 .and_then(|v| v.as_str())
360 .unwrap_or("")
361 .to_string()
362 })
363 .collect();
364 lines.push(values.join(delimiter));
365 }
366 }
367
368 Ok(Value::String(lines.join("\n")))
369}
370
371impl StageExecutor for InlineExecutor {
372 fn execute(&self, stage_id: &StageId, input: &Value) -> Result<Value, ExecutionError> {
373 if self.is_hof_stage(stage_id) {
375 return self.execute_hof(stage_id, input);
376 }
377 if self.is_executor_hof(stage_id) {
379 let desc = self.description_of(stage_id).unwrap_or("");
380 return execute_executor_stage(self, desc, input);
381 }
382 if self.is_csv_stage(stage_id) {
384 return self.execute_csv(stage_id, input);
385 }
386 if let Some(func) = self.implementations.get(&stage_id.0) {
388 return func(input);
389 }
390 if let Some(output) = self.fallback_outputs.get(&stage_id.0) {
392 return Ok(output.clone());
393 }
394 Err(ExecutionError::StageNotFound(stage_id.clone()))
396 }
397}
398
399#[cfg(test)]
400mod tests {
401 use super::*;
402 use noether_core::stdlib::load_stdlib;
403 use noether_store::{MemoryStore, StageStore};
404 use serde_json::json;
405
406 fn init_store() -> MemoryStore {
407 let mut store = MemoryStore::new();
408 for stage in load_stdlib() {
409 store.put(stage).unwrap();
410 }
411 store
412 }
413
414 fn find_id(store: &MemoryStore, desc: &str) -> StageId {
415 store
416 .list(None)
417 .into_iter()
418 .find(|s| s.description.contains(desc))
419 .unwrap()
420 .id
421 .clone()
422 }
423
424 #[test]
425 fn inline_to_text() {
426 let store = init_store();
427 let executor = InlineExecutor::from_store(&store);
428 let id = find_id(&store, "Convert any value to its text");
429 assert!(executor.has_implementation(&id));
430 let result = executor.execute(&id, &json!(42)).unwrap();
431 assert_eq!(result, json!("42"));
432 }
433
434 #[test]
435 fn inline_parse_json() {
436 let store = init_store();
437 let executor = InlineExecutor::from_store(&store);
438 let id = find_id(&store, "Parse a JSON string");
439 let result = executor.execute(&id, &json!(r#"{"a":1}"#)).unwrap();
440 assert_eq!(result, json!({"a": 1}));
441 }
442
443 #[test]
444 fn inline_text_split() {
445 let store = init_store();
446 let executor = InlineExecutor::from_store(&store);
447 let id = find_id(&store, "Split text by a delimiter");
448 let result = executor
449 .execute(&id, &json!({"text": "a,b,c", "delimiter": ","}))
450 .unwrap();
451 assert_eq!(result, json!(["a", "b", "c"]));
452 }
453
454 #[test]
455 fn inline_text_hash() {
456 let store = init_store();
457 let executor = InlineExecutor::from_store(&store);
458 let id = find_id(&store, "Compute a cryptographic hash");
459 let result = executor
460 .execute(&id, &json!({"text": "hello", "algorithm": "sha256"}))
461 .unwrap();
462 assert_eq!(
463 result["hash"],
464 "2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824"
465 );
466 }
467
468 #[test]
469 fn inline_sort() {
470 let store = init_store();
471 let executor = InlineExecutor::from_store(&store);
472 let id = find_id(&store, "Sort a list");
473 let result = executor
474 .execute(
475 &id,
476 &json!({"items": [3, 1, 2], "key": null, "descending": false}),
477 )
478 .unwrap();
479 assert_eq!(result, json!([1, 2, 3]));
480 }
481
482 #[test]
483 fn inline_json_merge() {
484 let store = init_store();
485 let executor = InlineExecutor::from_store(&store);
486 let id = find_id(&store, "Deep merge two JSON");
487 let result = executor
488 .execute(&id, &json!({"base": {"a": 1}, "patch": {"b": 2}}))
489 .unwrap();
490 assert_eq!(result, json!({"a": 1, "b": 2}));
491 }
492
493 #[test]
494 fn fallback_for_unimplemented() {
495 let store = init_store();
496 let executor = InlineExecutor::from_store(&store);
497 let id = find_id(&store, "Generate text completion using a language model");
499 assert!(!executor.has_implementation(&id));
500 let result = executor.execute(&id, &json!(null)).unwrap();
501 assert!(result.is_object());
502 }
503
504 #[test]
507 fn inline_map_with_to_text() {
508 let store = init_store();
509 let executor = InlineExecutor::from_store(&store);
510 let map_id = find_id(&store, "Apply a stage to each element");
511 let to_text_id = find_id(&store, "Convert any value to its text");
512
513 let result = executor
514 .execute(
515 &map_id,
516 &json!({"items": [1, 2, 3], "stage_id": to_text_id.0}),
517 )
518 .unwrap();
519 assert_eq!(result, json!(["1", "2", "3"]));
520 }
521
522 #[test]
523 fn inline_filter_with_to_bool() {
524 let store = init_store();
525 let executor = InlineExecutor::from_store(&store);
526 let filter_id = find_id(&store, "Keep only elements where");
527 let to_bool_id = find_id(&store, "Convert a value to boolean");
528
529 let result = executor
531 .execute(
532 &filter_id,
533 &json!({"items": [0, 1, 2, 0, 3], "stage_id": to_bool_id.0}),
534 )
535 .unwrap();
536 assert_eq!(result, json!([1, 2, 3]));
537 }
538
539 #[test]
540 fn inline_map_empty_list() {
541 let store = init_store();
542 let executor = InlineExecutor::from_store(&store);
543 let map_id = find_id(&store, "Apply a stage to each element");
544 let to_text_id = find_id(&store, "Convert any value to its text");
545
546 let result = executor
547 .execute(&map_id, &json!({"items": [], "stage_id": to_text_id.0}))
548 .unwrap();
549 assert_eq!(result, json!([]));
550 }
551
552 #[test]
555 fn inline_csv_parse() {
556 let store = init_store();
557 let executor = InlineExecutor::from_store(&store);
558 let id = find_id(&store, "Parse CSV text into a list");
559
560 let result = executor
561 .execute(
562 &id,
563 &json!({"text": "name,age\nAlice,30\nBob,25", "has_header": true, "delimiter": null}),
564 )
565 .unwrap();
566 let rows = result.as_array().unwrap();
567 assert_eq!(rows.len(), 2);
568 assert_eq!(rows[0]["name"], "Alice");
569 assert_eq!(rows[0]["age"], "30");
570 assert_eq!(rows[1]["name"], "Bob");
571 }
572
573 #[test]
574 fn inline_csv_write() {
575 let store = init_store();
576 let executor = InlineExecutor::from_store(&store);
577 let id = find_id(&store, "Serialize a list of row maps");
578
579 let result = executor
580 .execute(
581 &id,
582 &json!({"records": [{"name": "Alice", "age": "30"}, {"name": "Bob", "age": "25"}], "delimiter": null}),
583 )
584 .unwrap();
585 let text = result.as_str().unwrap();
586 assert!(text.contains("Alice"));
587 assert!(text.contains("Bob"));
588 assert!(text.contains("age"));
589 }
590
591 #[test]
592 fn inline_csv_roundtrip() {
593 let store = init_store();
594 let executor = InlineExecutor::from_store(&store);
595 let parse_id = find_id(&store, "Parse CSV text into a list");
596 let write_id = find_id(&store, "Serialize a list of row maps");
597
598 let csv_text = "name,age\nAlice,30\nBob,25";
599 let parsed = executor
600 .execute(
601 &parse_id,
602 &json!({"text": csv_text, "has_header": true, "delimiter": null}),
603 )
604 .unwrap();
605
606 let written = executor
607 .execute(&write_id, &json!({"records": parsed, "delimiter": null}))
608 .unwrap();
609 let text = written.as_str().unwrap();
610 assert!(text.contains("Alice"));
612 assert!(text.contains("Bob"));
613 assert!(text.contains("30"));
614 assert!(text.contains("25"));
615 }
616
617 #[test]
618 fn has_implementations_count() {
619 let store = init_store();
620 let executor = InlineExecutor::from_store(&store);
621 let count = store
622 .list(None)
623 .iter()
624 .filter(|s| executor.has_implementation(&s.id))
625 .count();
626 assert!(
628 count >= 22,
629 "Expected at least 22 real implementations, got {count}"
630 );
631 }
632
633 #[test]
636 fn registry_register_and_find() {
637 fn my_fn(_: &Value) -> Result<Value, ExecutionError> {
638 Ok(json!("from_registry"))
639 }
640
641 let mut reg = InlineRegistry::new();
642 assert!(reg.is_empty());
643 reg.register("my custom stage", my_fn);
644 assert_eq!(reg.len(), 1);
645
646 let found = reg.find("my custom stage");
647 assert!(found.is_some());
648 let result = found.unwrap()(&json!(null)).unwrap();
649 assert_eq!(result, json!("from_registry"));
650 }
651
652 #[test]
653 fn registry_falls_back_to_stdlib() {
654 let reg = InlineRegistry::new();
655 let found = reg.find("Convert any value to its text representation");
657 assert!(found.is_some(), "stdlib fallback should work");
658 }
659
660 #[test]
661 fn registry_extra_overrides_stdlib() {
662 fn override_fn(_: &Value) -> Result<Value, ExecutionError> {
663 Ok(json!("overridden"))
664 }
665
666 let mut reg = InlineRegistry::new();
667 reg.register("Convert any value to its text representation", override_fn);
668
669 let result = reg
670 .find("Convert any value to its text representation")
671 .unwrap()(&json!(42))
672 .unwrap();
673 assert_eq!(
674 result,
675 json!("overridden"),
676 "registered fn should shadow stdlib"
677 );
678 }
679
680 #[test]
681 fn from_store_with_registry_injects_extra_stage() {
682 fn always_42(_: &Value) -> Result<Value, ExecutionError> {
683 Ok(json!(42))
684 }
685
686 let mut store = init_store();
687 use noether_core::stage::StageBuilder;
688 use noether_core::stdlib::stdlib_signing_key;
689 use noether_core::types::NType;
690 let key = stdlib_signing_key();
691 let extra = StageBuilder::new("always_42")
692 .input(NType::Null)
693 .output(NType::Number)
694 .pure()
695 .description("Return 42 always")
696 .example(json!(null), json!(42.0))
697 .example(json!(null), json!(42.0))
698 .example(json!(null), json!(42.0))
699 .example(json!(null), json!(42.0))
700 .example(json!(null), json!(42.0))
701 .build_stdlib(&key)
702 .unwrap();
703 let extra_id = extra.id.clone();
704 store.put(extra).unwrap();
705
706 let mut registry = InlineRegistry::new();
707 registry.register("Return 42 always", always_42);
708
709 let executor = InlineExecutor::from_store_with_registry(&store, registry);
710 assert!(executor.has_implementation(&extra_id));
711 let result = executor.execute(&extra_id, &json!(null)).unwrap();
712 assert_eq!(result, json!(42));
713 }
714
715 #[test]
716 fn from_store_without_registry_does_not_see_extra() {
717 let mut store = init_store();
718 use noether_core::stage::StageBuilder;
719 use noether_core::stdlib::stdlib_signing_key;
720 use noether_core::types::NType;
721 let key = stdlib_signing_key();
722 let extra = StageBuilder::new("no_impl")
723 .input(NType::Null)
724 .output(NType::Null)
725 .pure()
726 .description("A stage with no registered implementation")
727 .example(json!(null), json!(null))
728 .example(json!(null), json!(null))
729 .example(json!(null), json!(null))
730 .example(json!(null), json!(null))
731 .example(json!(null), json!(null))
732 .build_stdlib(&key)
733 .unwrap();
734 let extra_id = extra.id.clone();
735 store.put(extra).unwrap();
736
737 let executor = InlineExecutor::from_store(&store);
738 assert!(!executor.has_implementation(&extra_id));
740 let result = executor.execute(&extra_id, &json!(null)).unwrap();
742 assert_eq!(result, json!(null));
743 }
744}