1use std::collections::HashMap;
7use std::fmt::Debug;
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use futures::stream::BoxStream;
12use serde_json::Value;
13
14use crate::error::{Error, Result};
15
16use super::base::Runnable;
17use super::config::{ConfigOrList, RunnableConfig, ensure_config, get_config_list, merge_configs};
18use super::utils::{
19 AnyConfigurableField, ConfigurableField, ConfigurableFieldMultiOption,
20 ConfigurableFieldSingleOption, ConfigurableFieldSpec, gather_with_concurrency,
21 get_unique_config_specs,
22};
23
24pub fn prefix_config_spec(spec: &ConfigurableFieldSpec, prefix: &str) -> ConfigurableFieldSpec {
38 if spec.is_shared {
39 spec.clone()
40 } else {
41 ConfigurableFieldSpec {
42 id: format!("{}/{}", prefix, spec.id),
43 annotation: spec.annotation.clone(),
44 name: spec.name.clone(),
45 description: spec.description.clone(),
46 default: spec.default.clone(),
47 is_shared: spec.is_shared,
48 dependencies: spec.dependencies.clone(),
49 }
50 }
51}
52
53pub fn make_options_spec_single(
65 spec: &ConfigurableFieldSingleOption,
66 description: Option<&str>,
67) -> ConfigurableFieldSpec {
68 let options_str = spec.options.keys().cloned().collect::<Vec<_>>().join(", ");
69 ConfigurableFieldSpec {
70 id: spec.id.clone(),
71 annotation: format!("Enum[{}]", options_str),
72 name: spec.name.clone(),
73 description: spec
74 .description
75 .clone()
76 .or_else(|| description.map(String::from)),
77 default: Some(Value::String(spec.default.clone())),
78 is_shared: spec.is_shared,
79 dependencies: None,
80 }
81}
82
83pub fn make_options_spec_multi(
85 spec: &ConfigurableFieldMultiOption,
86 description: Option<&str>,
87) -> ConfigurableFieldSpec {
88 let options_str = spec.options.keys().cloned().collect::<Vec<_>>().join(", ");
89 ConfigurableFieldSpec {
90 id: spec.id.clone(),
91 annotation: format!("Sequence[Enum[{}]]", options_str),
92 name: spec.name.clone(),
93 description: spec
94 .description
95 .clone()
96 .or_else(|| description.map(String::from)),
97 default: Some(Value::Array(
98 spec.default
99 .iter()
100 .map(|s| Value::String(s.clone()))
101 .collect(),
102 )),
103 is_shared: spec.is_shared,
104 dependencies: None,
105 }
106}
107
108fn str_remove_prefix(s: &str, prefix: &str) -> String {
110 if let Some(stripped) = s.strip_prefix(prefix) {
111 stripped.to_string()
112 } else {
113 s.to_string()
114 }
115}
116
117pub trait DynamicRunnable: Runnable {
122 fn config_specs(&self) -> Vec<ConfigurableFieldSpec>;
124
125 fn prepare(
129 &self,
130 config: Option<RunnableConfig>,
131 ) -> (
132 Arc<dyn Runnable<Input = Self::Input, Output = Self::Output> + Send + Sync>,
133 RunnableConfig,
134 );
135}
136
137#[derive(Debug)]
142pub struct RunnableConfigurableFields<I, O>
143where
144 I: Send + Sync + Clone + Debug + 'static,
145 O: Send + Sync + Clone + Debug + 'static,
146{
147 pub default: Arc<dyn Runnable<Input = I, Output = O> + Send + Sync>,
149 pub fields: HashMap<String, AnyConfigurableField>,
151 pub config: Option<RunnableConfig>,
153}
154
155impl<I, O> Clone for RunnableConfigurableFields<I, O>
156where
157 I: Send + Sync + Clone + Debug + 'static,
158 O: Send + Sync + Clone + Debug + 'static,
159{
160 fn clone(&self) -> Self {
161 Self {
162 default: Arc::clone(&self.default),
163 fields: self.fields.clone(),
164 config: self.config.clone(),
165 }
166 }
167}
168
169impl<I, O> RunnableConfigurableFields<I, O>
170where
171 I: Send + Sync + Clone + Debug + 'static,
172 O: Send + Sync + Clone + Debug + 'static,
173{
174 pub fn new(
176 default: Arc<dyn Runnable<Input = I, Output = O> + Send + Sync>,
177 fields: HashMap<String, AnyConfigurableField>,
178 ) -> Self {
179 Self {
180 default,
181 fields,
182 config: None,
183 }
184 }
185
186 pub fn with_config(mut self, config: RunnableConfig) -> Self {
188 self.config = Some(config);
189 self
190 }
191
192 pub fn config_specs(&self) -> Result<Vec<ConfigurableFieldSpec>> {
194 let mut config_specs = Vec::new();
195
196 for spec in self.fields.values() {
197 match spec {
198 AnyConfigurableField::Field(field) => {
199 config_specs.push(ConfigurableFieldSpec {
200 id: field.id.clone(),
201 annotation: field
202 .annotation
203 .clone()
204 .unwrap_or_else(|| "Any".to_string()),
205 name: field.name.clone(),
206 description: field.description.clone(),
207 default: None, is_shared: field.is_shared,
209 dependencies: None,
210 });
211 }
212 AnyConfigurableField::SingleOption(opt) => {
213 config_specs.push(make_options_spec_single(opt, None));
214 }
215 AnyConfigurableField::MultiOption(opt) => {
216 config_specs.push(make_options_spec_multi(opt, None));
217 }
218 }
219 }
220
221 get_unique_config_specs(config_specs).map_err(Error::other)
222 }
223
224 fn prepare_internal(
229 &self,
230 config: Option<RunnableConfig>,
231 ) -> (
232 Arc<dyn Runnable<Input = I, Output = O> + Send + Sync>,
233 RunnableConfig,
234 ) {
235 let merged = merge_configs(vec![self.config.clone(), config]);
236 let config = ensure_config(Some(merged));
237
238 let specs_by_id: HashMap<String, (&str, &AnyConfigurableField)> = self
240 .fields
241 .iter()
242 .map(|(key, spec)| {
243 let id = match spec {
244 AnyConfigurableField::Field(f) => f.id.clone(),
245 AnyConfigurableField::SingleOption(o) => o.id.clone(),
246 AnyConfigurableField::MultiOption(o) => o.id.clone(),
247 };
248 (id, (key.as_str(), spec))
249 })
250 .collect();
251
252 let mut configurable_fields: HashMap<String, Value> = HashMap::new();
254
255 for (key, value) in config.configurable.iter() {
256 if let Some((field_name, spec)) = specs_by_id.get(key) {
257 match spec {
258 AnyConfigurableField::Field(_) => {
259 configurable_fields.insert(field_name.to_string(), value.clone());
260 }
261 AnyConfigurableField::SingleOption(opt) => {
262 if let Some(selected_key) = value.as_str()
264 && let Some(option_value) = opt.options.get(selected_key)
265 {
266 configurable_fields
267 .insert(field_name.to_string(), option_value.clone());
268 }
269 }
270 AnyConfigurableField::MultiOption(opt) => {
271 if let Some(selected_keys) = value.as_array() {
273 let values: Vec<Value> = selected_keys
274 .iter()
275 .filter_map(|k| k.as_str())
276 .filter_map(|k| opt.options.get(k).cloned())
277 .collect();
278 configurable_fields
279 .insert(field_name.to_string(), Value::Array(values));
280 }
281 }
282 }
283 }
284 }
285
286 if configurable_fields.is_empty() {
288 return (Arc::clone(&self.default), config);
289 }
290
291 (Arc::clone(&self.default), config)
295 }
296}
297
298#[async_trait]
299impl<I, O> Runnable for RunnableConfigurableFields<I, O>
300where
301 I: Send + Sync + Clone + Debug + 'static,
302 O: Send + Sync + Clone + Debug + 'static,
303{
304 type Input = I;
305 type Output = O;
306
307 fn name(&self) -> Option<String> {
308 self.default.name()
309 }
310
311 fn invoke(&self, input: Self::Input, config: Option<RunnableConfig>) -> Result<Self::Output> {
312 let (runnable, config) = self.prepare_internal(config);
313 runnable.invoke(input, Some(config))
314 }
315
316 async fn ainvoke(
317 &self,
318 input: Self::Input,
319 config: Option<RunnableConfig>,
320 ) -> Result<Self::Output>
321 where
322 Self: 'static,
323 {
324 let (runnable, config) = self.prepare_internal(config);
325 runnable.ainvoke(input, Some(config)).await
326 }
327
328 fn batch(
329 &self,
330 inputs: Vec<Self::Input>,
331 config: Option<ConfigOrList>,
332 return_exceptions: bool,
333 ) -> Vec<Result<Self::Output>>
334 where
335 Self: 'static,
336 {
337 if inputs.is_empty() {
338 return Vec::new();
339 }
340
341 let configs = get_config_list(config, inputs.len());
342 let prepared: Vec<_> = configs
343 .iter()
344 .map(|c| self.prepare_internal(Some(c.clone())))
345 .collect();
346
347 let all_default = prepared.iter().all(|(r, _)| Arc::ptr_eq(r, &self.default));
349
350 if all_default {
351 let prepared_configs: Vec<_> = prepared.into_iter().map(|(_, c)| c).collect();
352 return self.default.batch(
353 inputs,
354 Some(ConfigOrList::List(prepared_configs)),
355 return_exceptions,
356 );
357 }
358
359 inputs
361 .into_iter()
362 .zip(prepared)
363 .map(|(input, (runnable, config))| runnable.invoke(input, Some(config)))
364 .collect()
365 }
366
367 async fn abatch(
368 &self,
369 inputs: Vec<Self::Input>,
370 config: Option<ConfigOrList>,
371 return_exceptions: bool,
372 ) -> Vec<Result<Self::Output>>
373 where
374 Self: 'static,
375 {
376 if inputs.is_empty() {
377 return Vec::new();
378 }
379
380 let configs = get_config_list(config, inputs.len());
381 let prepared: Vec<_> = configs
382 .iter()
383 .map(|c| self.prepare_internal(Some(c.clone())))
384 .collect();
385
386 let all_default = prepared.iter().all(|(r, _)| Arc::ptr_eq(r, &self.default));
388
389 if all_default {
390 let prepared_configs: Vec<_> = prepared.into_iter().map(|(_, c)| c).collect();
391 return self
392 .default
393 .abatch(
394 inputs,
395 Some(ConfigOrList::List(prepared_configs)),
396 return_exceptions,
397 )
398 .await;
399 }
400
401 let max_concurrency = configs.first().and_then(|c| c.max_concurrency);
403
404 let futures: Vec<_> = inputs
405 .into_iter()
406 .zip(prepared)
407 .map(|(input, (runnable, config))| {
408 Box::pin(async move { runnable.ainvoke(input, Some(config)).await })
409 as std::pin::Pin<Box<dyn std::future::Future<Output = Result<O>> + Send>>
410 })
411 .collect();
412
413 gather_with_concurrency(max_concurrency, futures).await
414 }
415
416 fn stream(
417 &self,
418 input: Self::Input,
419 config: Option<RunnableConfig>,
420 ) -> BoxStream<'_, Result<Self::Output>> {
421 let (runnable, config) = self.prepare_internal(config);
422 Box::pin(async_stream::stream! {
426 let result = runnable.invoke(input, Some(config));
427 yield result;
428 })
429 }
430}
431
432#[derive(Debug)]
437pub struct RunnableConfigurableAlternatives<I, O>
438where
439 I: Send + Sync + Clone + Debug + 'static,
440 O: Send + Sync + Clone + Debug + 'static,
441{
442 pub which: ConfigurableField,
444 pub default: Arc<dyn Runnable<Input = I, Output = O> + Send + Sync>,
446 pub alternatives: HashMap<String, Alternative<I, O>>,
448 pub default_key: String,
450 pub prefix_keys: bool,
452 pub config: Option<RunnableConfig>,
454}
455
456pub enum Alternative<I, O>
458where
459 I: Send + Sync + Clone + Debug + 'static,
460 O: Send + Sync + Clone + Debug + 'static,
461{
462 Runnable(Arc<dyn Runnable<Input = I, Output = O> + Send + Sync>),
464 Factory(Arc<dyn Fn() -> Arc<dyn Runnable<Input = I, Output = O> + Send + Sync> + Send + Sync>),
466}
467
468impl<I, O> Debug for Alternative<I, O>
469where
470 I: Send + Sync + Clone + Debug + 'static,
471 O: Send + Sync + Clone + Debug + 'static,
472{
473 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
474 match self {
475 Alternative::Runnable(_) => write!(f, "Alternative::Runnable(...)"),
476 Alternative::Factory(_) => write!(f, "Alternative::Factory(...)"),
477 }
478 }
479}
480
481impl<I, O> Clone for Alternative<I, O>
482where
483 I: Send + Sync + Clone + Debug + 'static,
484 O: Send + Sync + Clone + Debug + 'static,
485{
486 fn clone(&self) -> Self {
487 match self {
488 Alternative::Runnable(r) => Alternative::Runnable(Arc::clone(r)),
489 Alternative::Factory(f) => Alternative::Factory(Arc::clone(f)),
490 }
491 }
492}
493
494impl<I, O> Clone for RunnableConfigurableAlternatives<I, O>
495where
496 I: Send + Sync + Clone + Debug + 'static,
497 O: Send + Sync + Clone + Debug + 'static,
498{
499 fn clone(&self) -> Self {
500 Self {
501 which: self.which.clone(),
502 default: Arc::clone(&self.default),
503 alternatives: self.alternatives.clone(),
504 default_key: self.default_key.clone(),
505 prefix_keys: self.prefix_keys,
506 config: self.config.clone(),
507 }
508 }
509}
510
511impl<I, O> RunnableConfigurableAlternatives<I, O>
512where
513 I: Send + Sync + Clone + Debug + 'static,
514 O: Send + Sync + Clone + Debug + 'static,
515{
516 pub fn new(
518 which: ConfigurableField,
519 default: Arc<dyn Runnable<Input = I, Output = O> + Send + Sync>,
520 alternatives: HashMap<String, Alternative<I, O>>,
521 default_key: impl Into<String>,
522 prefix_keys: bool,
523 ) -> Self {
524 Self {
525 which,
526 default,
527 alternatives,
528 default_key: default_key.into(),
529 prefix_keys,
530 config: None,
531 }
532 }
533
534 pub fn with_config(mut self, config: RunnableConfig) -> Self {
536 self.config = Some(config);
537 self
538 }
539
540 pub fn config_specs(&self) -> Result<Vec<ConfigurableFieldSpec>> {
542 let mut all_keys: Vec<String> = self.alternatives.keys().cloned().collect();
543 all_keys.push(self.default_key.clone());
544
545 let which_spec = ConfigurableFieldSpec {
546 id: self.which.id.clone(),
547 annotation: format!("Enum[{}]", all_keys.join(", ")),
548 name: self.which.name.clone(),
549 description: self.which.description.clone(),
550 default: Some(Value::String(self.default_key.clone())),
551 is_shared: self.which.is_shared,
552 dependencies: None,
553 };
554
555 let specs = vec![which_spec];
556
557 get_unique_config_specs(specs).map_err(Error::other)
562 }
563
564 fn prepare_internal(
566 &self,
567 config: Option<RunnableConfig>,
568 ) -> Result<(
569 Arc<dyn Runnable<Input = I, Output = O> + Send + Sync>,
570 RunnableConfig,
571 )> {
572 let merged = merge_configs(vec![self.config.clone(), config]);
573 let config = ensure_config(Some(merged));
574
575 let which = config
577 .configurable
578 .get(&self.which.id)
579 .and_then(|v| v.as_str())
580 .map(|s| s.to_string())
581 .unwrap_or_else(|| self.default_key.clone());
582
583 let config = if self.prefix_keys {
585 let prefix = format!("{}=={}/", self.which.id, which);
586 let new_configurable: HashMap<String, Value> = config
587 .configurable
588 .iter()
589 .map(|(k, v)| (str_remove_prefix(k, &prefix), v.clone()))
590 .collect();
591 RunnableConfig {
592 configurable: new_configurable,
593 ..config
594 }
595 } else {
596 config
597 };
598
599 if which == self.default_key {
601 return Ok((Arc::clone(&self.default), config));
602 }
603
604 if let Some(alt) = self.alternatives.get(&which) {
605 let runnable = match alt {
606 Alternative::Runnable(r) => Arc::clone(r),
607 Alternative::Factory(f) => f(),
608 };
609 return Ok((runnable, config));
610 }
611
612 Err(Error::other(format!("Unknown alternative: {}", which)))
613 }
614}
615
616#[async_trait]
617impl<I, O> Runnable for RunnableConfigurableAlternatives<I, O>
618where
619 I: Send + Sync + Clone + Debug + 'static,
620 O: Send + Sync + Clone + Debug + 'static,
621{
622 type Input = I;
623 type Output = O;
624
625 fn name(&self) -> Option<String> {
626 self.default.name()
627 }
628
629 fn invoke(&self, input: Self::Input, config: Option<RunnableConfig>) -> Result<Self::Output> {
630 let (runnable, config) = self.prepare_internal(config)?;
631 runnable.invoke(input, Some(config))
632 }
633
634 async fn ainvoke(
635 &self,
636 input: Self::Input,
637 config: Option<RunnableConfig>,
638 ) -> Result<Self::Output>
639 where
640 Self: 'static,
641 {
642 let (runnable, config) = self.prepare_internal(config)?;
643 runnable.ainvoke(input, Some(config)).await
644 }
645
646 fn batch(
647 &self,
648 inputs: Vec<Self::Input>,
649 config: Option<ConfigOrList>,
650 return_exceptions: bool,
651 ) -> Vec<Result<Self::Output>>
652 where
653 Self: 'static,
654 {
655 if inputs.is_empty() {
656 return Vec::new();
657 }
658
659 let configs = get_config_list(config, inputs.len());
660 let prepared: Vec<_> = configs
661 .iter()
662 .map(|c| self.prepare_internal(Some(c.clone())))
663 .collect();
664
665 let all_default = prepared.iter().all(|r| {
667 r.as_ref()
668 .map(|(runnable, _)| Arc::ptr_eq(runnable, &self.default))
669 .unwrap_or(false)
670 });
671
672 if all_default {
673 let prepared_configs: Vec<_> = prepared
674 .into_iter()
675 .filter_map(|r| r.ok())
676 .map(|(_, c)| c)
677 .collect();
678 return self.default.batch(
679 inputs,
680 Some(ConfigOrList::List(prepared_configs)),
681 return_exceptions,
682 );
683 }
684
685 inputs
687 .into_iter()
688 .zip(prepared)
689 .map(|(input, prepared_result)| match prepared_result {
690 Ok((runnable, config)) => runnable.invoke(input, Some(config)),
691 Err(e) => Err(e),
692 })
693 .collect()
694 }
695
696 async fn abatch(
697 &self,
698 inputs: Vec<Self::Input>,
699 config: Option<ConfigOrList>,
700 return_exceptions: bool,
701 ) -> Vec<Result<Self::Output>>
702 where
703 Self: 'static,
704 {
705 if inputs.is_empty() {
706 return Vec::new();
707 }
708
709 let configs = get_config_list(config, inputs.len());
710 let prepared: Vec<_> = configs
711 .iter()
712 .map(|c| self.prepare_internal(Some(c.clone())))
713 .collect();
714
715 let all_default = prepared.iter().all(|r| {
717 r.as_ref()
718 .map(|(runnable, _)| Arc::ptr_eq(runnable, &self.default))
719 .unwrap_or(false)
720 });
721
722 if all_default {
723 let prepared_configs: Vec<_> = prepared
724 .into_iter()
725 .filter_map(|r| r.ok())
726 .map(|(_, c)| c)
727 .collect();
728 return self
729 .default
730 .abatch(
731 inputs,
732 Some(ConfigOrList::List(prepared_configs)),
733 return_exceptions,
734 )
735 .await;
736 }
737
738 let mut results = Vec::with_capacity(inputs.len());
740 for (input, prepared_result) in inputs.into_iter().zip(prepared) {
741 let result = match prepared_result {
742 Ok((runnable, config)) => runnable.ainvoke(input, Some(config)).await,
743 Err(e) => Err(e),
744 };
745 results.push(result);
746 }
747
748 results
749 }
750
751 fn stream(
752 &self,
753 input: Self::Input,
754 config: Option<RunnableConfig>,
755 ) -> BoxStream<'_, Result<Self::Output>> {
756 Box::pin(async_stream::stream! {
757 match self.prepare_internal(config) {
758 Ok((runnable, config)) => {
759 let result = runnable.invoke(input, Some(config));
760 yield result;
761 }
762 Err(e) => {
763 yield Err(e);
764 }
765 }
766 })
767 }
768}
769
770pub trait ConfigurableRunnable: Runnable + Sized {
772 fn configurable_fields(
792 self,
793 fields: HashMap<String, AnyConfigurableField>,
794 ) -> RunnableConfigurableFields<Self::Input, Self::Output>
795 where
796 Self: Send + Sync + 'static,
797 {
798 RunnableConfigurableFields::new(Arc::new(self), fields)
799 }
800
801 fn configurable_alternatives(
823 self,
824 which: ConfigurableField,
825 alternatives: HashMap<String, Alternative<Self::Input, Self::Output>>,
826 default_key: impl Into<String>,
827 prefix_keys: bool,
828 ) -> RunnableConfigurableAlternatives<Self::Input, Self::Output>
829 where
830 Self: Send + Sync + 'static,
831 {
832 RunnableConfigurableAlternatives::new(
833 which,
834 Arc::new(self),
835 alternatives,
836 default_key,
837 prefix_keys,
838 )
839 }
840}
841
842impl<R> ConfigurableRunnable for R where R: Runnable + Sized {}
844
845#[cfg(test)]
846mod tests {
847 use super::*;
848 use crate::runnables::base::RunnableLambda;
849
850 #[test]
851 fn test_prefix_config_spec() {
852 let spec = ConfigurableFieldSpec {
853 id: "temperature".to_string(),
854 annotation: "float".to_string(),
855 name: Some("Temperature".to_string()),
856 description: None,
857 default: Some(Value::Number(serde_json::Number::from_f64(0.7).unwrap())),
858 is_shared: false,
859 dependencies: None,
860 };
861
862 let prefixed = prefix_config_spec(&spec, "model==gpt4");
863 assert_eq!(prefixed.id, "model==gpt4/temperature");
864
865 let shared_spec = ConfigurableFieldSpec {
867 is_shared: true,
868 ..spec.clone()
869 };
870 let prefixed_shared = prefix_config_spec(&shared_spec, "model==gpt4");
871 assert_eq!(prefixed_shared.id, "temperature");
872 }
873
874 #[test]
875 fn test_str_remove_prefix() {
876 assert_eq!(
877 str_remove_prefix("model==gpt4/temperature", "model==gpt4/"),
878 "temperature"
879 );
880 assert_eq!(
881 str_remove_prefix("temperature", "model==gpt4/"),
882 "temperature"
883 );
884 }
885
886 #[test]
887 fn test_make_options_spec_single() {
888 let mut options = HashMap::new();
889 options.insert(
890 "low".to_string(),
891 Value::Number(serde_json::Number::from_f64(0.3).unwrap()),
892 );
893 options.insert(
894 "high".to_string(),
895 Value::Number(serde_json::Number::from_f64(0.9).unwrap()),
896 );
897
898 let spec = ConfigurableFieldSingleOption {
899 id: "temp_preset".to_string(),
900 options,
901 default: "low".to_string(),
902 name: Some("Temperature Preset".to_string()),
903 description: None,
904 is_shared: false,
905 };
906
907 let config_spec = make_options_spec_single(&spec, Some("Choose temperature"));
908 assert_eq!(config_spec.id, "temp_preset");
909 assert!(config_spec.annotation.contains("Enum"));
910 assert_eq!(config_spec.default, Some(Value::String("low".to_string())));
911 }
912
913 #[test]
914 fn test_configurable_fields_invoke() {
915 let runnable = RunnableLambda::new(|x: i32| Ok(x * 2));
916 let fields = HashMap::new();
917 let configurable = runnable.configurable_fields(fields);
918
919 let result = configurable.invoke(5, None).unwrap();
920 assert_eq!(result, 10);
921 }
922
923 #[test]
924 fn test_configurable_alternatives_invoke() {
925 let default = RunnableLambda::new(|x: i32| Ok(x * 2));
926 let alt = RunnableLambda::new(|x: i32| Ok(x * 3));
927
928 let mut alternatives = HashMap::new();
929 alternatives.insert("triple".to_string(), Alternative::Runnable(Arc::new(alt)));
930
931 let configurable = default.configurable_alternatives(
932 ConfigurableField::new("multiplier"),
933 alternatives,
934 "double",
935 false,
936 );
937
938 let result = configurable.invoke(5, None).unwrap();
940 assert_eq!(result, 10);
941
942 let mut config = RunnableConfig::default();
944 config.configurable.insert(
945 "multiplier".to_string(),
946 Value::String("triple".to_string()),
947 );
948 let result = configurable.invoke(5, Some(config)).unwrap();
949 assert_eq!(result, 15);
950 }
951
952 #[test]
953 fn test_configurable_alternatives_unknown() {
954 let default = RunnableLambda::new(|x: i32| Ok(x * 2));
955 let alternatives = HashMap::new();
956
957 let configurable = default.configurable_alternatives(
958 ConfigurableField::new("multiplier"),
959 alternatives,
960 "double",
961 false,
962 );
963
964 let mut config = RunnableConfig::default();
965 config.configurable.insert(
966 "multiplier".to_string(),
967 Value::String("unknown".to_string()),
968 );
969
970 let result = configurable.invoke(5, Some(config));
971 assert!(result.is_err());
972 }
973
974 #[tokio::test]
975 async fn test_configurable_fields_ainvoke() {
976 let runnable = RunnableLambda::new(|x: i32| Ok(x * 2));
977 let fields = HashMap::new();
978 let configurable = runnable.configurable_fields(fields);
979
980 let result = configurable.ainvoke(5, None).await.unwrap();
981 assert_eq!(result, 10);
982 }
983
984 #[tokio::test]
985 async fn test_configurable_alternatives_ainvoke() {
986 let default = RunnableLambda::new(|x: i32| Ok(x * 2));
987 let alt = RunnableLambda::new(|x: i32| Ok(x + 100));
988
989 let mut alternatives = HashMap::new();
990 alternatives.insert(
991 "add_hundred".to_string(),
992 Alternative::Runnable(Arc::new(alt)),
993 );
994
995 let configurable = default.configurable_alternatives(
996 ConfigurableField::new("operation"),
997 alternatives,
998 "double",
999 false,
1000 );
1001
1002 let mut config = RunnableConfig::default();
1003 config.configurable.insert(
1004 "operation".to_string(),
1005 Value::String("add_hundred".to_string()),
1006 );
1007
1008 let result = configurable.ainvoke(5, Some(config)).await.unwrap();
1009 assert_eq!(result, 105);
1010 }
1011
1012 #[test]
1013 fn test_configurable_with_factory() {
1014 let default = RunnableLambda::new(|x: i32| Ok(x * 2));
1015
1016 let mut alternatives = HashMap::new();
1017 alternatives.insert(
1018 "triple".to_string(),
1019 Alternative::Factory(Arc::new(|| {
1020 Arc::new(RunnableLambda::new(|x: i32| Ok(x * 3)))
1021 as Arc<dyn Runnable<Input = i32, Output = i32> + Send + Sync>
1022 })),
1023 );
1024
1025 let configurable = default.configurable_alternatives(
1026 ConfigurableField::new("multiplier"),
1027 alternatives,
1028 "double",
1029 false,
1030 );
1031
1032 let mut config = RunnableConfig::default();
1033 config.configurable.insert(
1034 "multiplier".to_string(),
1035 Value::String("triple".to_string()),
1036 );
1037
1038 let result = configurable.invoke(5, Some(config)).unwrap();
1039 assert_eq!(result, 15);
1040 }
1041}