1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5
6use futures::stream::{Stream, StreamExt};
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use tokio::sync::Semaphore;
10
11pub async fn gated_coro<T>(semaphore: Arc<Semaphore>, fut: impl Future<Output = T>) -> T {
23 let _permit = semaphore
24 .acquire()
25 .await
26 .expect("semaphore should not be closed");
27 fut.await
28}
29
30pub async fn gather_with_concurrency<T: Send + 'static>(
42 n: Option<usize>,
43 futures: Vec<Pin<Box<dyn Future<Output = T> + Send>>>,
44) -> Vec<T> {
45 if futures.is_empty() {
46 return Vec::new();
47 }
48
49 match n {
50 Some(limit) if limit > 0 => {
51 let semaphore = Arc::new(Semaphore::new(limit));
52 let gated_futures: Vec<_> = futures
53 .into_iter()
54 .map(|fut| {
55 let sem = semaphore.clone();
56 Box::pin(gated_coro(sem, fut)) as Pin<Box<dyn Future<Output = T> + Send>>
57 })
58 .collect();
59 futures::future::join_all(gated_futures).await
60 }
61 _ => futures::future::join_all(futures).await,
62 }
63}
64
65pub fn indent_lines_after_first(text: &str, prefix: &str) -> String {
74 let n_spaces = prefix.len();
75 let spaces = " ".repeat(n_spaces);
76 let lines: Vec<&str> = text.lines().collect();
77
78 if lines.is_empty() {
79 return String::new();
80 }
81
82 let mut result = lines[0].to_string();
83 for line in &lines[1..] {
84 result.push('\n');
85 result.push_str(&spaces);
86 result.push_str(line);
87 }
88
89 result
90}
91
92#[derive(Debug, Clone, Default, Serialize, Deserialize)]
100pub struct AddableDict(pub HashMap<String, Value>);
101
102impl AddableDict {
103 pub fn new() -> Self {
105 Self(HashMap::new())
106 }
107
108 pub fn from_map(map: HashMap<String, Value>) -> Self {
110 Self(map)
111 }
112
113 pub fn insert(&mut self, key: impl Into<String>, value: Value) {
115 self.0.insert(key.into(), value);
116 }
117
118 pub fn get(&self, key: &str) -> Option<&Value> {
120 self.0.get(key)
121 }
122
123 pub fn contains_key(&self, key: &str) -> bool {
125 self.0.contains_key(key)
126 }
127
128 pub fn iter(&self) -> impl Iterator<Item = (&String, &Value)> {
130 self.0.iter()
131 }
132
133 pub fn len(&self) -> usize {
135 self.0.len()
136 }
137
138 pub fn is_empty(&self) -> bool {
140 self.0.is_empty()
141 }
142}
143
144impl std::ops::Add for AddableDict {
145 type Output = Self;
146
147 fn add(self, other: Self) -> Self::Output {
148 let mut chunk = self.clone();
149
150 for (key, value) in other.0 {
151 match chunk.0.get(&key) {
152 None => {
153 chunk.0.insert(key, value);
154 }
155 Some(existing) if existing.is_null() => {
156 chunk.0.insert(key, value);
157 }
158 Some(existing) if !value.is_null() => {
159 let added = try_add_values(existing, &value);
160 chunk.0.insert(key, added);
161 }
162 _ => {}
163 }
164 }
165
166 chunk
167 }
168}
169
170impl std::ops::AddAssign for AddableDict {
171 fn add_assign(&mut self, other: Self) {
172 *self = self.clone() + other;
173 }
174}
175
176fn try_add_values(a: &Value, b: &Value) -> Value {
184 match (a, b) {
185 (Value::String(s1), Value::String(s2)) => Value::String(format!("{}{}", s1, s2)),
186 (Value::Array(arr1), Value::Array(arr2)) => {
187 let mut result = arr1.clone();
188 result.extend(arr2.clone());
189 Value::Array(result)
190 }
191 (Value::Object(obj1), Value::Object(obj2)) => {
192 let mut result = obj1.clone();
193 for (k, v) in obj2 {
194 result.insert(k.clone(), v.clone());
195 }
196 Value::Object(result)
197 }
198 (Value::Number(n1), Value::Number(n2)) => {
199 if let (Some(i1), Some(i2)) = (n1.as_i64(), n2.as_i64()) {
200 Value::Number((i1 + i2).into())
201 } else if let (Some(f1), Some(f2)) = (n1.as_f64(), n2.as_f64()) {
202 serde_json::Number::from_f64(f1 + f2)
203 .map(Value::Number)
204 .unwrap_or_else(|| b.clone())
205 } else {
206 b.clone()
207 }
208 }
209 _ => b.clone(),
210 }
211}
212
213pub trait Addable: Clone {
217 fn add(self, other: Self) -> Self;
219}
220
221impl<T: Clone + std::ops::Add<Output = T>> Addable for T {
222 fn add(self, other: Self) -> Self {
223 self + other
224 }
225}
226
227pub fn add<T: Addable>(addables: impl IntoIterator<Item = T>) -> Option<T> {
235 let mut final_value: Option<T> = None;
236
237 for chunk in addables {
238 final_value = match final_value {
239 None => Some(chunk),
240 Some(prev) => Some(prev.add(chunk)),
241 };
242 }
243
244 final_value
245}
246
247pub async fn aadd<T: Addable>(addables: impl Stream<Item = T> + Unpin) -> Option<T> {
255 let mut final_value: Option<T> = None;
256 let mut stream = addables;
257
258 while let Some(chunk) = stream.next().await {
259 final_value = match final_value {
260 None => Some(chunk),
261 Some(prev) => Some(prev.add(chunk)),
262 };
263 }
264
265 final_value
266}
267
268#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
272pub struct ConfigurableField {
273 pub id: String,
275 pub name: Option<String>,
277 pub description: Option<String>,
279 pub annotation: Option<String>,
281 pub is_shared: bool,
283}
284
285impl ConfigurableField {
286 pub fn new(id: impl Into<String>) -> Self {
287 Self {
288 id: id.into(),
289 name: None,
290 description: None,
291 annotation: None,
292 is_shared: false,
293 }
294 }
295
296 pub fn with_name(mut self, name: impl Into<String>) -> Self {
297 self.name = Some(name.into());
298 self
299 }
300
301 pub fn with_description(mut self, description: impl Into<String>) -> Self {
302 self.description = Some(description.into());
303 self
304 }
305
306 pub fn with_annotation(mut self, annotation: impl Into<String>) -> Self {
307 self.annotation = Some(annotation.into());
308 self
309 }
310
311 pub fn with_shared(mut self, is_shared: bool) -> Self {
312 self.is_shared = is_shared;
313 self
314 }
315}
316
317#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
321pub struct ConfigurableFieldSingleOption {
322 pub id: String,
324 pub options: HashMap<String, serde_json::Value>,
326 pub default: String,
328 pub name: Option<String>,
330 pub description: Option<String>,
332 pub is_shared: bool,
334}
335
336impl std::hash::Hash for ConfigurableFieldSingleOption {
337 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
338 self.id.hash(state);
339 let mut keys: Vec<_> = self.options.keys().collect();
340 keys.sort();
341 for key in keys {
342 key.hash(state);
343 }
344 self.default.hash(state);
345 }
346}
347
348impl ConfigurableFieldSingleOption {
349 pub fn new(
350 id: impl Into<String>,
351 options: HashMap<String, serde_json::Value>,
352 default: impl Into<String>,
353 ) -> Self {
354 Self {
355 id: id.into(),
356 options,
357 default: default.into(),
358 name: None,
359 description: None,
360 is_shared: false,
361 }
362 }
363}
364
365#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
369pub struct ConfigurableFieldMultiOption {
370 pub id: String,
372 pub options: HashMap<String, serde_json::Value>,
374 pub default: Vec<String>,
376 pub name: Option<String>,
378 pub description: Option<String>,
380 pub is_shared: bool,
382}
383
384impl std::hash::Hash for ConfigurableFieldMultiOption {
385 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
386 self.id.hash(state);
387 let mut keys: Vec<_> = self.options.keys().collect();
388 keys.sort();
389 for key in keys {
390 key.hash(state);
391 }
392 for d in &self.default {
393 d.hash(state);
394 }
395 }
396}
397
398impl ConfigurableFieldMultiOption {
399 pub fn new(
400 id: impl Into<String>,
401 options: HashMap<String, serde_json::Value>,
402 default: Vec<String>,
403 ) -> Self {
404 Self {
405 id: id.into(),
406 options,
407 default,
408 name: None,
409 description: None,
410 is_shared: false,
411 }
412 }
413}
414
415#[derive(Debug, Clone, Serialize, Deserialize)]
417pub enum AnyConfigurableField {
418 Field(ConfigurableField),
419 SingleOption(ConfigurableFieldSingleOption),
420 MultiOption(ConfigurableFieldMultiOption),
421}
422
423#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
427pub struct ConfigurableFieldSpec {
428 pub id: String,
430 pub annotation: String,
432 pub name: Option<String>,
434 pub description: Option<String>,
436 pub default: Option<serde_json::Value>,
438 pub is_shared: bool,
440 pub dependencies: Option<Vec<String>>,
442}
443
444impl ConfigurableFieldSpec {
445 pub fn new(id: impl Into<String>, annotation: impl Into<String>) -> Self {
446 Self {
447 id: id.into(),
448 annotation: annotation.into(),
449 name: None,
450 description: None,
451 default: None,
452 is_shared: false,
453 dependencies: None,
454 }
455 }
456}
457
458pub fn get_unique_config_specs(
469 specs: impl IntoIterator<Item = ConfigurableFieldSpec>,
470) -> Result<Vec<ConfigurableFieldSpec>, String> {
471 use std::collections::BTreeMap;
472
473 let mut grouped: BTreeMap<String, Vec<ConfigurableFieldSpec>> = BTreeMap::new();
474
475 for spec in specs {
476 grouped.entry(spec.id.clone()).or_default().push(spec);
477 }
478
479 let mut unique = Vec::new();
480
481 for (spec_id, dupes) in grouped {
482 if dupes.is_empty() {
483 continue;
484 }
485
486 let first = &dupes[0];
487
488 if dupes.len() == 1 || dupes.iter().skip(1).all(|s| s == first) {
489 unique.push(first.clone());
490 } else {
491 return Err(format!(
492 "RunnableSequence contains conflicting config specs for {}: {:?}",
493 spec_id, dupes
494 ));
495 }
496 }
497
498 Ok(unique)
499}
500
501pub struct RootEventFilter {
506 pub include_names: Option<Vec<String>>,
508 pub include_types: Option<Vec<String>>,
510 pub include_tags: Option<Vec<String>>,
512 pub exclude_names: Option<Vec<String>>,
514 pub exclude_types: Option<Vec<String>>,
516 pub exclude_tags: Option<Vec<String>>,
518}
519
520impl RootEventFilter {
521 pub fn new() -> Self {
523 Self {
524 include_names: None,
525 include_types: None,
526 include_tags: None,
527 exclude_names: None,
528 exclude_types: None,
529 exclude_tags: None,
530 }
531 }
532
533 pub fn include_event(&self, event_name: &str, event_tags: &[String], root_type: &str) -> bool {
543 let mut include = self.include_names.is_none()
544 && self.include_types.is_none()
545 && self.include_tags.is_none();
546
547 if let Some(names) = &self.include_names {
548 include = include || names.iter().any(|n| n == event_name);
549 }
550
551 if let Some(types) = &self.include_types {
552 include = include || types.iter().any(|t| t == root_type);
553 }
554
555 if let Some(tags) = &self.include_tags {
556 include = include || event_tags.iter().any(|tag| tags.contains(tag));
557 }
558
559 if let Some(names) = &self.exclude_names {
560 include = include && !names.iter().any(|n| n == event_name);
561 }
562
563 if let Some(types) = &self.exclude_types {
564 include = include && !types.iter().any(|t| t == root_type);
565 }
566
567 if let Some(tags) = &self.exclude_tags {
568 include = include && !event_tags.iter().any(|tag| tags.contains(tag));
569 }
570
571 include
572 }
573}
574
575impl Default for RootEventFilter {
576 fn default() -> Self {
577 Self::new()
578 }
579}
580
581pub fn is_async_generator<F, S, T>(_f: F) -> bool
621where
622 F: Fn() -> S,
623 S: Stream<Item = T>,
624{
625 true
626}
627
628pub fn is_async_callable<F, Fut>(_f: F) -> bool
640where
641 F: Fn() -> Fut,
642 Fut: Future,
643{
644 true
645}
646
647#[cfg(test)]
648mod tests {
649 use super::*;
650
651 #[test]
652 fn test_indent_lines_after_first() {
653 let text = "line1\nline2\nline3";
654 let result = indent_lines_after_first(text, " ");
655 assert_eq!(result, "line1\n line2\n line3");
656 }
657
658 #[test]
659 fn test_add() {
660 let values = vec![1, 2, 3, 4, 5];
661 let result = add(values);
662 assert_eq!(result, Some(15));
663
664 let empty: Vec<i32> = vec![];
665 let result = add(empty);
666 assert_eq!(result, None);
667 }
668
669 #[tokio::test]
670 async fn test_gather_with_concurrency() {
671 let futures: Vec<Pin<Box<dyn Future<Output = i32> + Send>>> = vec![
672 Box::pin(async { 1 }),
673 Box::pin(async { 2 }),
674 Box::pin(async { 3 }),
675 ];
676
677 let results = gather_with_concurrency(Some(2), futures).await;
678 assert_eq!(results, vec![1, 2, 3]);
679 }
680
681 #[tokio::test]
682 async fn test_gather_with_concurrency_no_limit() {
683 let futures: Vec<Pin<Box<dyn Future<Output = i32> + Send>>> = vec![
684 Box::pin(async { 1 }),
685 Box::pin(async { 2 }),
686 Box::pin(async { 3 }),
687 ];
688
689 let results = gather_with_concurrency(None, futures).await;
690 assert_eq!(results, vec![1, 2, 3]);
691 }
692
693 #[tokio::test]
694 async fn test_gather_with_concurrency_empty() {
695 let futures: Vec<Pin<Box<dyn Future<Output = i32> + Send>>> = vec![];
696 let results = gather_with_concurrency(Some(2), futures).await;
697 assert!(results.is_empty());
698 }
699
700 #[test]
701 fn test_addable_dict() {
702 let mut dict1 = AddableDict::new();
703 dict1.insert("a", Value::String("hello".to_string()));
704 dict1.insert("b", Value::Number(1.into()));
705
706 let mut dict2 = AddableDict::new();
707 dict2.insert("b", Value::Number(2.into()));
708 dict2.insert("c", Value::String(" world".to_string()));
709
710 let result = dict1 + dict2;
711 assert_eq!(result.get("a"), Some(&Value::String("hello".to_string())));
712 assert_eq!(result.get("b"), Some(&Value::Number(3.into())));
713 assert_eq!(result.get("c"), Some(&Value::String(" world".to_string())));
714 }
715
716 #[test]
717 fn test_configurable_field() {
718 let field = ConfigurableField::new("test_id")
719 .with_name("Test Field")
720 .with_description("A test field")
721 .with_shared(true);
722
723 assert_eq!(field.id, "test_id");
724 assert_eq!(field.name, Some("Test Field".to_string()));
725 assert_eq!(field.description, Some("A test field".to_string()));
726 assert!(field.is_shared);
727 }
728
729 #[test]
730 fn test_get_unique_config_specs() {
731 let spec1 = ConfigurableFieldSpec::new("id1", "String");
732 let spec2 = ConfigurableFieldSpec::new("id1", "String");
733 let spec3 = ConfigurableFieldSpec::new("id2", "Int");
734
735 let specs = vec![spec1, spec2, spec3];
736 let result = get_unique_config_specs(specs).unwrap();
737
738 assert_eq!(result.len(), 2);
739 assert_eq!(result[0].id, "id1");
740 assert_eq!(result[1].id, "id2");
741 }
742
743 #[test]
744 fn test_get_unique_config_specs_conflict() {
745 let spec1 = ConfigurableFieldSpec::new("id1", "String");
746 let mut spec2 = ConfigurableFieldSpec::new("id1", "String");
747 spec2.description = Some("Different".to_string());
748
749 let specs = vec![spec1, spec2];
750 let result = get_unique_config_specs(specs);
751
752 assert!(result.is_err());
753 }
754
755 #[test]
756 fn test_root_event_filter() {
757 let filter = RootEventFilter {
758 include_names: Some(vec!["test".to_string()]),
759 include_types: None,
760 include_tags: None,
761 exclude_names: None,
762 exclude_types: None,
763 exclude_tags: None,
764 };
765
766 assert!(filter.include_event("test", &[], "chain"));
767 assert!(!filter.include_event("other", &[], "chain"));
768 }
769
770 #[test]
771 fn test_root_event_filter_tags() {
772 let filter = RootEventFilter {
773 include_names: None,
774 include_types: None,
775 include_tags: Some(vec!["important".to_string()]),
776 exclude_names: None,
777 exclude_types: None,
778 exclude_tags: None,
779 };
780
781 assert!(filter.include_event("test", &["important".to_string()], "chain"));
782 assert!(!filter.include_event("test", &["unimportant".to_string()], "chain"));
783 }
784
785 #[test]
786 fn test_root_event_filter_exclude() {
787 let filter = RootEventFilter {
788 include_names: None,
789 include_types: None,
790 include_tags: None,
791 exclude_names: Some(vec!["skip".to_string()]),
792 exclude_types: None,
793 exclude_tags: None,
794 };
795
796 assert!(filter.include_event("test", &[], "chain"));
797 assert!(!filter.include_event("skip", &[], "chain"));
798 }
799}