1use std::collections::HashMap;
7use std::future::Future;
8use std::sync::Arc;
9
10use uuid::Uuid;
11
12use crate::messages::BaseMessage;
13use crate::outputs::ChatResult;
14
15use super::base::{BaseCallbackHandler, BaseCallbackManager, Callbacks};
16use crate::utils::uuid::uuid7;
17
18pub fn handle_event<F>(
22 handlers: &[Arc<dyn BaseCallbackHandler>],
23 ignore_condition: Option<fn(&dyn BaseCallbackHandler) -> bool>,
24 mut event_fn: F,
25) where
26 F: FnMut(&Arc<dyn BaseCallbackHandler>),
27{
28 for handler in handlers {
29 if let Some(ignore_fn) = ignore_condition
30 && ignore_fn(handler.as_ref())
31 {
32 continue;
33 }
34 event_fn(handler);
35 }
36}
37
38pub async fn ahandle_event<F, Fut>(
44 handlers: &[Arc<dyn BaseCallbackHandler>],
45 ignore_condition: Option<fn(&dyn BaseCallbackHandler) -> bool>,
46 event_fn: F,
47) where
48 F: Fn(&Arc<dyn BaseCallbackHandler>) -> Fut + Send + Sync,
49 Fut: Future<Output = ()> + Send,
50{
51 for handler in handlers.iter().filter(|h| h.run_inline()) {
53 if let Some(ignore_fn) = ignore_condition
54 && ignore_fn(handler.as_ref())
55 {
56 continue;
57 }
58 event_fn(handler).await;
59 }
60
61 let non_inline_futures: Vec<_> = handlers
63 .iter()
64 .filter(|h| !h.run_inline())
65 .filter(|h| {
66 if let Some(ignore_fn) = ignore_condition {
67 !ignore_fn(h.as_ref())
68 } else {
69 true
70 }
71 })
72 .map(event_fn)
73 .collect();
74
75 futures::future::join_all(non_inline_futures).await;
76}
77
78#[derive(Debug, Clone)]
80pub struct BaseRunManager {
81 pub run_id: Uuid,
83 pub handlers: Vec<Arc<dyn BaseCallbackHandler>>,
85 pub inheritable_handlers: Vec<Arc<dyn BaseCallbackHandler>>,
87 pub parent_run_id: Option<Uuid>,
89 pub tags: Vec<String>,
91 pub inheritable_tags: Vec<String>,
93 pub metadata: HashMap<String, serde_json::Value>,
95 pub inheritable_metadata: HashMap<String, serde_json::Value>,
97}
98
99impl BaseRunManager {
100 #[allow(clippy::too_many_arguments)]
102 pub fn new(
103 run_id: Uuid,
104 handlers: Vec<Arc<dyn BaseCallbackHandler>>,
105 inheritable_handlers: Vec<Arc<dyn BaseCallbackHandler>>,
106 parent_run_id: Option<Uuid>,
107 tags: Option<Vec<String>>,
108 inheritable_tags: Option<Vec<String>>,
109 metadata: Option<HashMap<String, serde_json::Value>>,
110 inheritable_metadata: Option<HashMap<String, serde_json::Value>>,
111 ) -> Self {
112 Self {
113 run_id,
114 handlers,
115 inheritable_handlers,
116 parent_run_id,
117 tags: tags.unwrap_or_default(),
118 inheritable_tags: inheritable_tags.unwrap_or_default(),
119 metadata: metadata.unwrap_or_default(),
120 inheritable_metadata: inheritable_metadata.unwrap_or_default(),
121 }
122 }
123
124 pub fn get_noop_manager() -> Self {
126 Self {
127 run_id: uuid7(None),
128 handlers: Vec::new(),
129 inheritable_handlers: Vec::new(),
130 parent_run_id: None,
131 tags: Vec::new(),
132 inheritable_tags: Vec::new(),
133 metadata: HashMap::new(),
134 inheritable_metadata: HashMap::new(),
135 }
136 }
137}
138
139#[derive(Debug, Clone)]
141pub struct RunManager {
142 inner: BaseRunManager,
144}
145
146impl RunManager {
147 #[allow(clippy::too_many_arguments)]
149 pub fn new(
150 run_id: Uuid,
151 handlers: Vec<Arc<dyn BaseCallbackHandler>>,
152 inheritable_handlers: Vec<Arc<dyn BaseCallbackHandler>>,
153 parent_run_id: Option<Uuid>,
154 tags: Option<Vec<String>>,
155 inheritable_tags: Option<Vec<String>>,
156 metadata: Option<HashMap<String, serde_json::Value>>,
157 inheritable_metadata: Option<HashMap<String, serde_json::Value>>,
158 ) -> Self {
159 Self {
160 inner: BaseRunManager::new(
161 run_id,
162 handlers,
163 inheritable_handlers,
164 parent_run_id,
165 tags,
166 inheritable_tags,
167 metadata,
168 inheritable_metadata,
169 ),
170 }
171 }
172
173 pub fn run_id(&self) -> Uuid {
175 self.inner.run_id
176 }
177
178 pub fn parent_run_id(&self) -> Option<Uuid> {
180 self.inner.parent_run_id
181 }
182
183 pub fn handlers(&self) -> &[Arc<dyn BaseCallbackHandler>] {
185 &self.inner.handlers
186 }
187
188 pub fn tags(&self) -> &[String] {
190 &self.inner.tags
191 }
192
193 pub fn on_text(&self, text: &str) {
195 if self.inner.handlers.is_empty() {
196 return;
197 }
198 let run_id = self.inner.run_id;
199 let parent_run_id = self.inner.parent_run_id;
200 let tags = self.inner.tags.clone();
201 handle_event(&self.inner.handlers, None, |_handler| {
202 let _ = (text, run_id, parent_run_id, &tags);
203 });
204 }
205
206 pub fn on_retry(&self, retry_state: &serde_json::Value) {
208 if self.inner.handlers.is_empty() {
209 return;
210 }
211 let run_id = self.inner.run_id;
212 let parent_run_id = self.inner.parent_run_id;
213 let tags = self.inner.tags.clone();
214 handle_event(
215 &self.inner.handlers,
216 Some(|h: &dyn BaseCallbackHandler| h.ignore_retry()),
217 |_handler| {
218 let _ = (retry_state, run_id, parent_run_id, &tags);
219 },
220 );
221 }
222
223 pub fn get_noop_manager() -> Self {
225 Self {
226 inner: BaseRunManager::get_noop_manager(),
227 }
228 }
229}
230
231#[derive(Debug, Clone)]
235pub struct AsyncRunManager {
236 inner: BaseRunManager,
238}
239
240impl AsyncRunManager {
241 #[allow(clippy::too_many_arguments)]
243 pub fn new(
244 run_id: Uuid,
245 handlers: Vec<Arc<dyn BaseCallbackHandler>>,
246 inheritable_handlers: Vec<Arc<dyn BaseCallbackHandler>>,
247 parent_run_id: Option<Uuid>,
248 tags: Option<Vec<String>>,
249 inheritable_tags: Option<Vec<String>>,
250 metadata: Option<HashMap<String, serde_json::Value>>,
251 inheritable_metadata: Option<HashMap<String, serde_json::Value>>,
252 ) -> Self {
253 Self {
254 inner: BaseRunManager::new(
255 run_id,
256 handlers,
257 inheritable_handlers,
258 parent_run_id,
259 tags,
260 inheritable_tags,
261 metadata,
262 inheritable_metadata,
263 ),
264 }
265 }
266
267 pub fn get_sync(&self) -> RunManager {
269 RunManager::new(
270 self.inner.run_id,
271 self.inner.handlers.clone(),
272 self.inner.inheritable_handlers.clone(),
273 self.inner.parent_run_id,
274 Some(self.inner.tags.clone()),
275 Some(self.inner.inheritable_tags.clone()),
276 Some(self.inner.metadata.clone()),
277 Some(self.inner.inheritable_metadata.clone()),
278 )
279 }
280
281 pub fn run_id(&self) -> Uuid {
283 self.inner.run_id
284 }
285
286 pub fn parent_run_id(&self) -> Option<Uuid> {
288 self.inner.parent_run_id
289 }
290
291 pub fn handlers(&self) -> &[Arc<dyn BaseCallbackHandler>] {
293 &self.inner.handlers
294 }
295
296 pub fn tags(&self) -> &[String] {
298 &self.inner.tags
299 }
300
301 pub async fn on_text(&self, text: &str) {
303 if self.inner.handlers.is_empty() {
304 return;
305 }
306 let run_id = self.inner.run_id;
307 let parent_run_id = self.inner.parent_run_id;
308 let tags = self.inner.tags.clone();
309 ahandle_event(&self.inner.handlers, None, |_handler| {
310 let _ = (text, run_id, parent_run_id, &tags);
311 async {}
312 })
313 .await;
314 }
315
316 pub async fn on_retry(&self, retry_state: &serde_json::Value) {
318 if self.inner.handlers.is_empty() {
319 return;
320 }
321 let run_id = self.inner.run_id;
322 let parent_run_id = self.inner.parent_run_id;
323 let tags = self.inner.tags.clone();
324 ahandle_event(
325 &self.inner.handlers,
326 Some(|h: &dyn BaseCallbackHandler| h.ignore_retry()),
327 |_handler| {
328 let _ = (retry_state, run_id, parent_run_id, &tags);
329 async {}
330 },
331 )
332 .await;
333 }
334
335 pub fn get_noop_manager() -> Self {
337 Self {
338 inner: BaseRunManager::get_noop_manager(),
339 }
340 }
341}
342
343#[derive(Debug, Clone)]
347pub struct AsyncParentRunManager {
348 inner: AsyncRunManager,
350}
351
352impl AsyncParentRunManager {
353 #[allow(clippy::too_many_arguments)]
355 pub fn new(
356 run_id: Uuid,
357 handlers: Vec<Arc<dyn BaseCallbackHandler>>,
358 inheritable_handlers: Vec<Arc<dyn BaseCallbackHandler>>,
359 parent_run_id: Option<Uuid>,
360 tags: Option<Vec<String>>,
361 inheritable_tags: Option<Vec<String>>,
362 metadata: Option<HashMap<String, serde_json::Value>>,
363 inheritable_metadata: Option<HashMap<String, serde_json::Value>>,
364 ) -> Self {
365 Self {
366 inner: AsyncRunManager::new(
367 run_id,
368 handlers,
369 inheritable_handlers,
370 parent_run_id,
371 tags,
372 inheritable_tags,
373 metadata,
374 inheritable_metadata,
375 ),
376 }
377 }
378
379 pub fn get_child(&self, tag: Option<&str>) -> AsyncCallbackManager {
381 let mut manager = AsyncCallbackManager::new();
382 manager.inner.parent_run_id = Some(self.inner.run_id());
383 manager.set_handlers(self.inner.inner.inheritable_handlers.clone(), true);
384 manager.add_tags(self.inner.inner.inheritable_tags.clone(), true);
385 manager.add_metadata(self.inner.inner.inheritable_metadata.clone(), true);
386 if let Some(tag) = tag {
387 manager.add_tags(vec![tag.to_string()], false);
388 }
389 manager
390 }
391
392 pub fn get_sync(&self) -> ParentRunManager {
394 ParentRunManager::new(
395 self.inner.inner.run_id,
396 self.inner.inner.handlers.clone(),
397 self.inner.inner.inheritable_handlers.clone(),
398 self.inner.inner.parent_run_id,
399 Some(self.inner.inner.tags.clone()),
400 Some(self.inner.inner.inheritable_tags.clone()),
401 Some(self.inner.inner.metadata.clone()),
402 Some(self.inner.inner.inheritable_metadata.clone()),
403 )
404 }
405
406 pub fn run_id(&self) -> Uuid {
408 self.inner.run_id()
409 }
410
411 pub fn parent_run_id(&self) -> Option<Uuid> {
413 self.inner.parent_run_id()
414 }
415
416 pub fn handlers(&self) -> &[Arc<dyn BaseCallbackHandler>] {
418 self.inner.handlers()
419 }
420
421 pub fn tags(&self) -> &[String] {
423 self.inner.tags()
424 }
425
426 pub fn get_noop_manager() -> Self {
428 Self {
429 inner: AsyncRunManager::get_noop_manager(),
430 }
431 }
432}
433
434#[derive(Debug, Clone)]
436pub struct ParentRunManager {
437 inner: RunManager,
439}
440
441impl ParentRunManager {
442 #[allow(clippy::too_many_arguments)]
444 pub fn new(
445 run_id: Uuid,
446 handlers: Vec<Arc<dyn BaseCallbackHandler>>,
447 inheritable_handlers: Vec<Arc<dyn BaseCallbackHandler>>,
448 parent_run_id: Option<Uuid>,
449 tags: Option<Vec<String>>,
450 inheritable_tags: Option<Vec<String>>,
451 metadata: Option<HashMap<String, serde_json::Value>>,
452 inheritable_metadata: Option<HashMap<String, serde_json::Value>>,
453 ) -> Self {
454 Self {
455 inner: RunManager::new(
456 run_id,
457 handlers,
458 inheritable_handlers,
459 parent_run_id,
460 tags,
461 inheritable_tags,
462 metadata,
463 inheritable_metadata,
464 ),
465 }
466 }
467
468 pub fn get_child(&self, tag: Option<&str>) -> CallbackManager {
470 let mut manager = CallbackManager::new();
471 manager.parent_run_id = Some(self.inner.run_id());
472 manager.set_handlers(self.inner.inner.inheritable_handlers.clone(), true);
473 manager.add_tags(self.inner.inner.inheritable_tags.clone(), true);
474 manager.add_metadata(self.inner.inner.inheritable_metadata.clone(), true);
475 if let Some(tag) = tag {
476 manager.add_tags(vec![tag.to_string()], false);
477 }
478 manager
479 }
480
481 pub fn run_id(&self) -> Uuid {
483 self.inner.run_id()
484 }
485
486 pub fn parent_run_id(&self) -> Option<Uuid> {
488 self.inner.parent_run_id()
489 }
490
491 pub fn handlers(&self) -> &[Arc<dyn BaseCallbackHandler>] {
493 self.inner.handlers()
494 }
495
496 pub fn tags(&self) -> &[String] {
498 self.inner.tags()
499 }
500
501 pub fn get_noop_manager() -> Self {
503 Self {
504 inner: RunManager::get_noop_manager(),
505 }
506 }
507}
508
509#[derive(Debug, Clone)]
511pub struct CallbackManagerForLLMRun {
512 inner: RunManager,
514}
515
516impl CallbackManagerForLLMRun {
517 #[allow(clippy::too_many_arguments)]
519 pub fn new(
520 run_id: Uuid,
521 handlers: Vec<Arc<dyn BaseCallbackHandler>>,
522 inheritable_handlers: Vec<Arc<dyn BaseCallbackHandler>>,
523 parent_run_id: Option<Uuid>,
524 tags: Option<Vec<String>>,
525 inheritable_tags: Option<Vec<String>>,
526 metadata: Option<HashMap<String, serde_json::Value>>,
527 inheritable_metadata: Option<HashMap<String, serde_json::Value>>,
528 ) -> Self {
529 Self {
530 inner: RunManager::new(
531 run_id,
532 handlers,
533 inheritable_handlers,
534 parent_run_id,
535 tags,
536 inheritable_tags,
537 metadata,
538 inheritable_metadata,
539 ),
540 }
541 }
542
543 pub fn run_id(&self) -> Uuid {
545 self.inner.run_id()
546 }
547
548 pub fn parent_run_id(&self) -> Option<Uuid> {
550 self.inner.parent_run_id()
551 }
552
553 pub fn handlers(&self) -> &[Arc<dyn BaseCallbackHandler>] {
555 self.inner.handlers()
556 }
557
558 pub fn tags(&self) -> &[String] {
560 self.inner.tags()
561 }
562
563 pub fn on_llm_new_token(&self, token: &str, chunk: Option<&serde_json::Value>) {
565 if self.inner.inner.handlers.is_empty() {
566 return;
567 }
568 let run_id = self.inner.run_id();
569 let parent_run_id = self.inner.parent_run_id();
570 let tags = self.inner.tags().to_vec();
571 handle_event(
572 self.inner.handlers(),
573 Some(|h: &dyn BaseCallbackHandler| h.ignore_llm()),
574 |_handler| {
575 let _ = (token, run_id, parent_run_id, chunk, &tags);
576 },
577 );
578 }
579
580 pub fn on_llm_end(&self, response: &ChatResult) {
582 if self.inner.inner.handlers.is_empty() {
583 return;
584 }
585 let run_id = self.inner.run_id();
586 let parent_run_id = self.inner.parent_run_id();
587 let tags = self.inner.tags().to_vec();
588 handle_event(
589 self.inner.handlers(),
590 Some(|h: &dyn BaseCallbackHandler| h.ignore_llm()),
591 |_handler| {
592 let _ = (response, run_id, parent_run_id, &tags);
593 },
594 );
595 }
596
597 pub fn on_llm_error(&self, error: &dyn std::error::Error) {
599 if self.inner.inner.handlers.is_empty() {
600 return;
601 }
602 let run_id = self.inner.run_id();
603 let parent_run_id = self.inner.parent_run_id();
604 let tags = self.inner.tags().to_vec();
605 handle_event(
606 self.inner.handlers(),
607 Some(|h: &dyn BaseCallbackHandler| h.ignore_llm()),
608 |_handler| {
609 let _ = (error, run_id, parent_run_id, &tags);
610 },
611 );
612 }
613
614 pub fn get_noop_manager() -> Self {
616 Self {
617 inner: RunManager::get_noop_manager(),
618 }
619 }
620}
621
622#[derive(Debug, Clone)]
624pub struct CallbackManagerForChainRun {
625 inner: ParentRunManager,
627}
628
629impl CallbackManagerForChainRun {
630 #[allow(clippy::too_many_arguments)]
632 pub fn new(
633 run_id: Uuid,
634 handlers: Vec<Arc<dyn BaseCallbackHandler>>,
635 inheritable_handlers: Vec<Arc<dyn BaseCallbackHandler>>,
636 parent_run_id: Option<Uuid>,
637 tags: Option<Vec<String>>,
638 inheritable_tags: Option<Vec<String>>,
639 metadata: Option<HashMap<String, serde_json::Value>>,
640 inheritable_metadata: Option<HashMap<String, serde_json::Value>>,
641 ) -> Self {
642 Self {
643 inner: ParentRunManager::new(
644 run_id,
645 handlers,
646 inheritable_handlers,
647 parent_run_id,
648 tags,
649 inheritable_tags,
650 metadata,
651 inheritable_metadata,
652 ),
653 }
654 }
655
656 pub fn run_id(&self) -> Uuid {
658 self.inner.run_id()
659 }
660
661 pub fn parent_run_id(&self) -> Option<Uuid> {
663 self.inner.parent_run_id()
664 }
665
666 pub fn handlers(&self) -> &[Arc<dyn BaseCallbackHandler>] {
668 self.inner.handlers()
669 }
670
671 pub fn tags(&self) -> &[String] {
673 self.inner.tags()
674 }
675
676 pub fn get_child(&self, tag: Option<&str>) -> CallbackManager {
678 self.inner.get_child(tag)
679 }
680
681 pub fn on_chain_end(&self, outputs: &HashMap<String, serde_json::Value>) {
683 if self.inner.inner.inner.handlers.is_empty() {
684 return;
685 }
686 let run_id = self.inner.run_id();
687 let parent_run_id = self.inner.parent_run_id();
688 let tags = self.inner.tags().to_vec();
689 handle_event(
690 self.inner.handlers(),
691 Some(|h: &dyn BaseCallbackHandler| h.ignore_chain()),
692 |_handler| {
693 let _ = (outputs, run_id, parent_run_id, &tags);
694 },
695 );
696 }
697
698 pub fn on_chain_error(&self, error: &dyn std::error::Error) {
700 if self.inner.inner.inner.handlers.is_empty() {
701 return;
702 }
703 let run_id = self.inner.run_id();
704 let parent_run_id = self.inner.parent_run_id();
705 let tags = self.inner.tags().to_vec();
706 handle_event(
707 self.inner.handlers(),
708 Some(|h: &dyn BaseCallbackHandler| h.ignore_chain()),
709 |_handler| {
710 let _ = (error, run_id, parent_run_id, &tags);
711 },
712 );
713 }
714
715 pub fn on_agent_action(&self, action: &serde_json::Value) {
717 if self.inner.inner.inner.handlers.is_empty() {
718 return;
719 }
720 let run_id = self.inner.run_id();
721 let parent_run_id = self.inner.parent_run_id();
722 let tags = self.inner.tags().to_vec();
723 handle_event(
724 self.inner.handlers(),
725 Some(|h: &dyn BaseCallbackHandler| h.ignore_agent()),
726 |_handler| {
727 let _ = (action, run_id, parent_run_id, &tags);
728 },
729 );
730 }
731
732 pub fn on_agent_finish(&self, finish: &serde_json::Value) {
734 if self.inner.inner.inner.handlers.is_empty() {
735 return;
736 }
737 let run_id = self.inner.run_id();
738 let parent_run_id = self.inner.parent_run_id();
739 let tags = self.inner.tags().to_vec();
740 handle_event(
741 self.inner.handlers(),
742 Some(|h: &dyn BaseCallbackHandler| h.ignore_agent()),
743 |_handler| {
744 let _ = (finish, run_id, parent_run_id, &tags);
745 },
746 );
747 }
748
749 pub fn get_noop_manager() -> Self {
751 Self {
752 inner: ParentRunManager::get_noop_manager(),
753 }
754 }
755}
756
757#[derive(Debug, Clone)]
759pub struct CallbackManagerForToolRun {
760 inner: ParentRunManager,
762}
763
764impl CallbackManagerForToolRun {
765 #[allow(clippy::too_many_arguments)]
767 pub fn new(
768 run_id: Uuid,
769 handlers: Vec<Arc<dyn BaseCallbackHandler>>,
770 inheritable_handlers: Vec<Arc<dyn BaseCallbackHandler>>,
771 parent_run_id: Option<Uuid>,
772 tags: Option<Vec<String>>,
773 inheritable_tags: Option<Vec<String>>,
774 metadata: Option<HashMap<String, serde_json::Value>>,
775 inheritable_metadata: Option<HashMap<String, serde_json::Value>>,
776 ) -> Self {
777 Self {
778 inner: ParentRunManager::new(
779 run_id,
780 handlers,
781 inheritable_handlers,
782 parent_run_id,
783 tags,
784 inheritable_tags,
785 metadata,
786 inheritable_metadata,
787 ),
788 }
789 }
790
791 pub fn run_id(&self) -> Uuid {
793 self.inner.run_id()
794 }
795
796 pub fn parent_run_id(&self) -> Option<Uuid> {
798 self.inner.parent_run_id()
799 }
800
801 pub fn handlers(&self) -> &[Arc<dyn BaseCallbackHandler>] {
803 self.inner.handlers()
804 }
805
806 pub fn tags(&self) -> &[String] {
808 self.inner.tags()
809 }
810
811 pub fn get_child(&self, tag: Option<&str>) -> CallbackManager {
813 self.inner.get_child(tag)
814 }
815
816 pub fn on_tool_end(&self, output: &str) {
818 if self.inner.inner.inner.handlers.is_empty() {
819 return;
820 }
821 let run_id = self.inner.run_id();
822 let parent_run_id = self.inner.parent_run_id();
823 let tags = self.inner.tags().to_vec();
824 handle_event(
825 self.inner.handlers(),
826 Some(|h: &dyn BaseCallbackHandler| h.ignore_agent()),
827 |_handler| {
828 let _ = (output, run_id, parent_run_id, &tags);
829 },
830 );
831 }
832
833 pub fn on_tool_error(&self, error: &dyn std::error::Error) {
835 if self.inner.inner.inner.handlers.is_empty() {
836 return;
837 }
838 let run_id = self.inner.run_id();
839 let parent_run_id = self.inner.parent_run_id();
840 let tags = self.inner.tags().to_vec();
841 handle_event(
842 self.inner.handlers(),
843 Some(|h: &dyn BaseCallbackHandler| h.ignore_agent()),
844 |_handler| {
845 let _ = (error, run_id, parent_run_id, &tags);
846 },
847 );
848 }
849
850 pub fn get_noop_manager() -> Self {
852 Self {
853 inner: ParentRunManager::get_noop_manager(),
854 }
855 }
856}
857
858#[derive(Debug, Clone)]
860pub struct CallbackManagerForRetrieverRun {
861 inner: ParentRunManager,
863}
864
865impl CallbackManagerForRetrieverRun {
866 #[allow(clippy::too_many_arguments)]
868 pub fn new(
869 run_id: Uuid,
870 handlers: Vec<Arc<dyn BaseCallbackHandler>>,
871 inheritable_handlers: Vec<Arc<dyn BaseCallbackHandler>>,
872 parent_run_id: Option<Uuid>,
873 tags: Option<Vec<String>>,
874 inheritable_tags: Option<Vec<String>>,
875 metadata: Option<HashMap<String, serde_json::Value>>,
876 inheritable_metadata: Option<HashMap<String, serde_json::Value>>,
877 ) -> Self {
878 Self {
879 inner: ParentRunManager::new(
880 run_id,
881 handlers,
882 inheritable_handlers,
883 parent_run_id,
884 tags,
885 inheritable_tags,
886 metadata,
887 inheritable_metadata,
888 ),
889 }
890 }
891
892 pub fn run_id(&self) -> Uuid {
894 self.inner.run_id()
895 }
896
897 pub fn parent_run_id(&self) -> Option<Uuid> {
899 self.inner.parent_run_id()
900 }
901
902 pub fn handlers(&self) -> &[Arc<dyn BaseCallbackHandler>] {
904 self.inner.handlers()
905 }
906
907 pub fn tags(&self) -> &[String] {
909 self.inner.tags()
910 }
911
912 pub fn get_child(&self, tag: Option<&str>) -> CallbackManager {
914 self.inner.get_child(tag)
915 }
916
917 pub fn on_retriever_end(&self, documents: &[serde_json::Value]) {
919 if self.inner.inner.inner.handlers.is_empty() {
920 return;
921 }
922 let run_id = self.inner.run_id();
923 let parent_run_id = self.inner.parent_run_id();
924 let tags = self.inner.tags().to_vec();
925 handle_event(
926 self.inner.handlers(),
927 Some(|h: &dyn BaseCallbackHandler| h.ignore_retriever()),
928 |_handler| {
929 let _ = (documents, run_id, parent_run_id, &tags);
930 },
931 );
932 }
933
934 pub fn on_retriever_error(&self, error: &dyn std::error::Error) {
936 if self.inner.inner.inner.handlers.is_empty() {
937 return;
938 }
939 let run_id = self.inner.run_id();
940 let parent_run_id = self.inner.parent_run_id();
941 let tags = self.inner.tags().to_vec();
942 handle_event(
943 self.inner.handlers(),
944 Some(|h: &dyn BaseCallbackHandler| h.ignore_retriever()),
945 |_handler| {
946 let _ = (error, run_id, parent_run_id, &tags);
947 },
948 );
949 }
950
951 pub fn get_noop_manager() -> Self {
953 Self {
954 inner: ParentRunManager::get_noop_manager(),
955 }
956 }
957}
958
959#[derive(Debug, Clone, Default)]
961pub struct CallbackManager {
962 pub handlers: Vec<Arc<dyn BaseCallbackHandler>>,
964 pub inheritable_handlers: Vec<Arc<dyn BaseCallbackHandler>>,
966 pub parent_run_id: Option<Uuid>,
968 pub tags: Vec<String>,
970 pub inheritable_tags: Vec<String>,
972 pub metadata: HashMap<String, serde_json::Value>,
974 pub inheritable_metadata: HashMap<String, serde_json::Value>,
976}
977
978impl CallbackManager {
979 pub fn new() -> Self {
981 Self::default()
982 }
983
984 pub fn from_base(base: BaseCallbackManager) -> Self {
986 Self {
987 handlers: base.handlers,
988 inheritable_handlers: base.inheritable_handlers,
989 parent_run_id: base.parent_run_id,
990 tags: base.tags,
991 inheritable_tags: base.inheritable_tags,
992 metadata: base.metadata,
993 inheritable_metadata: base.inheritable_metadata,
994 }
995 }
996
997 pub fn set_handlers(&mut self, handlers: Vec<Arc<dyn BaseCallbackHandler>>, inherit: bool) {
999 self.handlers = handlers.clone();
1000 if inherit {
1001 self.inheritable_handlers = handlers;
1002 }
1003 }
1004
1005 pub fn add_handler(&mut self, handler: Arc<dyn BaseCallbackHandler>, inherit: bool) {
1007 if !self
1008 .handlers
1009 .iter()
1010 .any(|h| std::ptr::eq(h.as_ref(), handler.as_ref()))
1011 {
1012 self.handlers.push(handler.clone());
1013 }
1014 if inherit
1015 && !self
1016 .inheritable_handlers
1017 .iter()
1018 .any(|h| std::ptr::eq(h.as_ref(), handler.as_ref()))
1019 {
1020 self.inheritable_handlers.push(handler);
1021 }
1022 }
1023
1024 pub fn add_tags(&mut self, tags: Vec<String>, inherit: bool) {
1026 for tag in &tags {
1027 if !self.tags.contains(tag) {
1028 self.tags.push(tag.clone());
1029 }
1030 }
1031 if inherit {
1032 for tag in tags {
1033 if !self.inheritable_tags.contains(&tag) {
1034 self.inheritable_tags.push(tag);
1035 }
1036 }
1037 }
1038 }
1039
1040 pub fn add_metadata(&mut self, metadata: HashMap<String, serde_json::Value>, inherit: bool) {
1042 self.metadata.extend(metadata.clone());
1043 if inherit {
1044 self.inheritable_metadata.extend(metadata);
1045 }
1046 }
1047
1048 pub fn on_llm_start(
1050 &self,
1051 serialized: &HashMap<String, serde_json::Value>,
1052 prompts: &[String],
1053 run_id: Option<Uuid>,
1054 ) -> Vec<CallbackManagerForLLMRun> {
1055 let mut managers = Vec::new();
1056
1057 for (i, _prompt) in prompts.iter().enumerate() {
1058 let run_id = if i == 0
1059 && let Some(run_id) = run_id
1060 {
1061 run_id
1062 } else {
1063 uuid7(None)
1064 };
1065
1066 handle_event(
1067 &self.handlers,
1068 Some(|h: &dyn BaseCallbackHandler| h.ignore_llm()),
1069 |_handler| {
1070 let _ = (serialized, run_id);
1071 },
1072 );
1073
1074 managers.push(CallbackManagerForLLMRun::new(
1075 run_id,
1076 self.handlers.clone(),
1077 self.inheritable_handlers.clone(),
1078 self.parent_run_id,
1079 Some(self.tags.clone()),
1080 Some(self.inheritable_tags.clone()),
1081 Some(self.metadata.clone()),
1082 Some(self.inheritable_metadata.clone()),
1083 ));
1084 }
1085
1086 managers
1087 }
1088
1089 pub fn on_chat_model_start(
1091 &self,
1092 serialized: &HashMap<String, serde_json::Value>,
1093 messages: &[Vec<BaseMessage>],
1094 run_id: Option<Uuid>,
1095 ) -> Vec<CallbackManagerForLLMRun> {
1096 let mut managers = Vec::new();
1097 let mut current_run_id = run_id;
1098
1099 for _message_list in messages {
1100 let run_id = current_run_id.unwrap_or_else(|| uuid7(None));
1101 current_run_id = None;
1102
1103 handle_event(
1104 &self.handlers,
1105 Some(|h: &dyn BaseCallbackHandler| h.ignore_chat_model()),
1106 |_handler| {
1107 let _ = (serialized, run_id);
1108 },
1109 );
1110
1111 managers.push(CallbackManagerForLLMRun::new(
1112 run_id,
1113 self.handlers.clone(),
1114 self.inheritable_handlers.clone(),
1115 self.parent_run_id,
1116 Some(self.tags.clone()),
1117 Some(self.inheritable_tags.clone()),
1118 Some(self.metadata.clone()),
1119 Some(self.inheritable_metadata.clone()),
1120 ));
1121 }
1122
1123 managers
1124 }
1125
1126 pub fn on_chain_start(
1128 &self,
1129 serialized: &HashMap<String, serde_json::Value>,
1130 inputs: &HashMap<String, serde_json::Value>,
1131 run_id: Option<Uuid>,
1132 ) -> CallbackManagerForChainRun {
1133 let run_id = run_id.unwrap_or_else(|| uuid7(None));
1134
1135 handle_event(
1136 &self.handlers,
1137 Some(|h: &dyn BaseCallbackHandler| h.ignore_chain()),
1138 |_handler| {
1139 let _ = (serialized, inputs, run_id);
1140 },
1141 );
1142
1143 CallbackManagerForChainRun::new(
1144 run_id,
1145 self.handlers.clone(),
1146 self.inheritable_handlers.clone(),
1147 self.parent_run_id,
1148 Some(self.tags.clone()),
1149 Some(self.inheritable_tags.clone()),
1150 Some(self.metadata.clone()),
1151 Some(self.inheritable_metadata.clone()),
1152 )
1153 }
1154
1155 pub fn on_tool_start(
1157 &self,
1158 serialized: &HashMap<String, serde_json::Value>,
1159 input_str: &str,
1160 run_id: Option<Uuid>,
1161 inputs: Option<&HashMap<String, serde_json::Value>>,
1162 ) -> CallbackManagerForToolRun {
1163 let run_id = run_id.unwrap_or_else(|| uuid7(None));
1164
1165 handle_event(
1166 &self.handlers,
1167 Some(|h: &dyn BaseCallbackHandler| h.ignore_agent()),
1168 |_handler| {
1169 let _ = (serialized, input_str, run_id, inputs);
1170 },
1171 );
1172
1173 CallbackManagerForToolRun::new(
1174 run_id,
1175 self.handlers.clone(),
1176 self.inheritable_handlers.clone(),
1177 self.parent_run_id,
1178 Some(self.tags.clone()),
1179 Some(self.inheritable_tags.clone()),
1180 Some(self.metadata.clone()),
1181 Some(self.inheritable_metadata.clone()),
1182 )
1183 }
1184
1185 pub fn on_retriever_start(
1187 &self,
1188 serialized: &HashMap<String, serde_json::Value>,
1189 query: &str,
1190 run_id: Option<Uuid>,
1191 ) -> CallbackManagerForRetrieverRun {
1192 let run_id = run_id.unwrap_or_else(|| uuid7(None));
1193
1194 handle_event(
1195 &self.handlers,
1196 Some(|h: &dyn BaseCallbackHandler| h.ignore_retriever()),
1197 |_handler| {
1198 let _ = (serialized, query, run_id);
1199 },
1200 );
1201
1202 CallbackManagerForRetrieverRun::new(
1203 run_id,
1204 self.handlers.clone(),
1205 self.inheritable_handlers.clone(),
1206 self.parent_run_id,
1207 Some(self.tags.clone()),
1208 Some(self.inheritable_tags.clone()),
1209 Some(self.metadata.clone()),
1210 Some(self.inheritable_metadata.clone()),
1211 )
1212 }
1213
1214 pub fn on_custom_event(&self, name: &str, data: &serde_json::Value, run_id: Option<Uuid>) {
1216 if self.handlers.is_empty() {
1217 return;
1218 }
1219
1220 let run_id = run_id.unwrap_or_else(|| uuid7(None));
1221
1222 handle_event(
1223 &self.handlers,
1224 Some(|h: &dyn BaseCallbackHandler| h.ignore_custom_event()),
1225 |_handler| {
1226 let _ = (name, data, run_id);
1227 },
1228 );
1229 }
1230
1231 pub fn configure(
1233 inheritable_callbacks: Option<Callbacks>,
1234 local_callbacks: Option<Callbacks>,
1235 inheritable_tags: Option<Vec<String>>,
1236 local_tags: Option<Vec<String>>,
1237 inheritable_metadata: Option<HashMap<String, serde_json::Value>>,
1238 local_metadata: Option<HashMap<String, serde_json::Value>>,
1239 _verbose: bool,
1240 ) -> Self {
1241 let mut callback_manager = Self::new();
1242
1243 if let Some(callbacks) = inheritable_callbacks {
1244 match callbacks {
1245 Callbacks::Handlers(handlers) => {
1246 callback_manager.handlers = handlers.clone();
1247 callback_manager.inheritable_handlers = handlers;
1248 }
1249 Callbacks::Manager(manager) => {
1250 callback_manager.handlers = manager.handlers.clone();
1251 callback_manager.inheritable_handlers = manager.inheritable_handlers.clone();
1252 callback_manager.parent_run_id = manager.parent_run_id;
1253 callback_manager.tags = manager.tags.clone();
1254 callback_manager.inheritable_tags = manager.inheritable_tags.clone();
1255 callback_manager.metadata = manager.metadata.clone();
1256 callback_manager.inheritable_metadata = manager.inheritable_metadata.clone();
1257 }
1258 }
1259 }
1260
1261 if let Some(callbacks) = local_callbacks {
1262 match callbacks {
1263 Callbacks::Handlers(handlers) => {
1264 for handler in handlers {
1265 callback_manager.add_handler(handler, false);
1266 }
1267 }
1268 Callbacks::Manager(manager) => {
1269 for handler in manager.handlers {
1270 callback_manager.add_handler(handler, false);
1271 }
1272 }
1273 }
1274 }
1275
1276 if let Some(tags) = inheritable_tags {
1277 callback_manager.add_tags(tags, true);
1278 }
1279 if let Some(tags) = local_tags {
1280 callback_manager.add_tags(tags, false);
1281 }
1282 if let Some(metadata) = inheritable_metadata {
1283 callback_manager.add_metadata(metadata, true);
1284 }
1285 if let Some(metadata) = local_metadata {
1286 callback_manager.add_metadata(metadata, false);
1287 }
1288
1289 callback_manager
1290 }
1291}
1292
1293#[derive(Debug, Clone, Default)]
1295pub struct AsyncCallbackManager {
1296 inner: CallbackManager,
1298}
1299
1300impl AsyncCallbackManager {
1301 pub fn new() -> Self {
1303 Self::default()
1304 }
1305
1306 pub fn from_callback_manager(manager: CallbackManager) -> Self {
1308 Self { inner: manager }
1309 }
1310
1311 pub fn handlers(&self) -> &[Arc<dyn BaseCallbackHandler>] {
1313 &self.inner.handlers
1314 }
1315
1316 pub fn parent_run_id(&self) -> Option<Uuid> {
1318 self.inner.parent_run_id
1319 }
1320
1321 pub fn set_handlers(&mut self, handlers: Vec<Arc<dyn BaseCallbackHandler>>, inherit: bool) {
1323 self.inner.set_handlers(handlers, inherit);
1324 }
1325
1326 pub fn add_handler(&mut self, handler: Arc<dyn BaseCallbackHandler>, inherit: bool) {
1328 self.inner.add_handler(handler, inherit);
1329 }
1330
1331 pub fn add_tags(&mut self, tags: Vec<String>, inherit: bool) {
1333 self.inner.add_tags(tags, inherit);
1334 }
1335
1336 pub fn add_metadata(&mut self, metadata: HashMap<String, serde_json::Value>, inherit: bool) {
1338 self.inner.add_metadata(metadata, inherit);
1339 }
1340
1341 pub fn is_async(&self) -> bool {
1343 true
1344 }
1345
1346 pub async fn on_llm_start(
1348 &self,
1349 serialized: &HashMap<String, serde_json::Value>,
1350 prompts: &[String],
1351 run_id: Option<Uuid>,
1352 ) -> Vec<AsyncCallbackManagerForLLMRun> {
1353 self.inner
1354 .on_llm_start(serialized, prompts, run_id)
1355 .into_iter()
1356 .map(AsyncCallbackManagerForLLMRun::from_sync)
1357 .collect()
1358 }
1359
1360 pub async fn on_chat_model_start(
1362 &self,
1363 serialized: &HashMap<String, serde_json::Value>,
1364 messages: &[Vec<BaseMessage>],
1365 run_id: Option<Uuid>,
1366 ) -> Vec<AsyncCallbackManagerForLLMRun> {
1367 self.inner
1368 .on_chat_model_start(serialized, messages, run_id)
1369 .into_iter()
1370 .map(AsyncCallbackManagerForLLMRun::from_sync)
1371 .collect()
1372 }
1373
1374 pub async fn on_chain_start(
1376 &self,
1377 serialized: &HashMap<String, serde_json::Value>,
1378 inputs: &HashMap<String, serde_json::Value>,
1379 run_id: Option<Uuid>,
1380 ) -> AsyncCallbackManagerForChainRun {
1381 AsyncCallbackManagerForChainRun::from_sync(
1382 self.inner.on_chain_start(serialized, inputs, run_id),
1383 )
1384 }
1385
1386 pub async fn on_tool_start(
1388 &self,
1389 serialized: &HashMap<String, serde_json::Value>,
1390 input_str: &str,
1391 run_id: Option<Uuid>,
1392 inputs: Option<&HashMap<String, serde_json::Value>>,
1393 ) -> AsyncCallbackManagerForToolRun {
1394 AsyncCallbackManagerForToolRun::from_sync(
1395 self.inner
1396 .on_tool_start(serialized, input_str, run_id, inputs),
1397 )
1398 }
1399
1400 pub async fn on_retriever_start(
1402 &self,
1403 serialized: &HashMap<String, serde_json::Value>,
1404 query: &str,
1405 run_id: Option<Uuid>,
1406 ) -> AsyncCallbackManagerForRetrieverRun {
1407 AsyncCallbackManagerForRetrieverRun::from_sync(
1408 self.inner.on_retriever_start(serialized, query, run_id),
1409 )
1410 }
1411
1412 pub async fn on_custom_event(
1414 &self,
1415 name: &str,
1416 data: &serde_json::Value,
1417 run_id: Option<Uuid>,
1418 ) {
1419 if self.inner.handlers.is_empty() {
1420 return;
1421 }
1422
1423 let run_id = run_id.unwrap_or_else(|| uuid7(None));
1424
1425 handle_event(
1426 &self.inner.handlers,
1427 Some(|h: &dyn BaseCallbackHandler| h.ignore_custom_event()),
1428 |_handler| {
1429 let _ = (name, data, run_id);
1430 },
1431 );
1432 }
1433
1434 pub fn configure(
1436 inheritable_callbacks: Option<Callbacks>,
1437 local_callbacks: Option<Callbacks>,
1438 inheritable_tags: Option<Vec<String>>,
1439 local_tags: Option<Vec<String>>,
1440 inheritable_metadata: Option<HashMap<String, serde_json::Value>>,
1441 local_metadata: Option<HashMap<String, serde_json::Value>>,
1442 verbose: bool,
1443 ) -> Self {
1444 Self {
1445 inner: CallbackManager::configure(
1446 inheritable_callbacks,
1447 local_callbacks,
1448 inheritable_tags,
1449 local_tags,
1450 inheritable_metadata,
1451 local_metadata,
1452 verbose,
1453 ),
1454 }
1455 }
1456}
1457
1458#[derive(Debug, Clone)]
1460pub struct AsyncCallbackManagerForLLMRun {
1461 inner: CallbackManagerForLLMRun,
1463}
1464
1465impl AsyncCallbackManagerForLLMRun {
1466 pub fn from_sync(inner: CallbackManagerForLLMRun) -> Self {
1468 Self { inner }
1469 }
1470
1471 pub fn get_sync(&self) -> CallbackManagerForLLMRun {
1473 self.inner.clone()
1474 }
1475
1476 pub fn run_id(&self) -> Uuid {
1478 self.inner.run_id()
1479 }
1480
1481 pub fn parent_run_id(&self) -> Option<Uuid> {
1483 self.inner.parent_run_id()
1484 }
1485
1486 pub fn handlers(&self) -> &[Arc<dyn BaseCallbackHandler>] {
1488 self.inner.handlers()
1489 }
1490
1491 pub async fn on_llm_new_token(&self, token: &str, chunk: Option<&serde_json::Value>) {
1493 self.inner.on_llm_new_token(token, chunk);
1494 }
1495
1496 pub async fn on_llm_end(&self, response: &ChatResult) {
1498 self.inner.on_llm_end(response);
1499 }
1500
1501 pub async fn on_llm_error(&self, error: &dyn std::error::Error) {
1503 self.inner.on_llm_error(error);
1504 }
1505
1506 pub fn get_noop_manager() -> Self {
1508 Self {
1509 inner: CallbackManagerForLLMRun::get_noop_manager(),
1510 }
1511 }
1512}
1513
1514#[derive(Debug, Clone)]
1516pub struct AsyncCallbackManagerForChainRun {
1517 inner: CallbackManagerForChainRun,
1519}
1520
1521impl AsyncCallbackManagerForChainRun {
1522 pub fn from_sync(inner: CallbackManagerForChainRun) -> Self {
1524 Self { inner }
1525 }
1526
1527 pub fn get_sync(&self) -> CallbackManagerForChainRun {
1529 self.inner.clone()
1530 }
1531
1532 pub fn run_id(&self) -> Uuid {
1534 self.inner.run_id()
1535 }
1536
1537 pub fn parent_run_id(&self) -> Option<Uuid> {
1539 self.inner.parent_run_id()
1540 }
1541
1542 pub fn handlers(&self) -> &[Arc<dyn BaseCallbackHandler>] {
1544 self.inner.handlers()
1545 }
1546
1547 pub fn get_child(&self, tag: Option<&str>) -> AsyncCallbackManager {
1549 AsyncCallbackManager::from_callback_manager(self.inner.get_child(tag))
1550 }
1551
1552 pub async fn on_chain_end(&self, outputs: &HashMap<String, serde_json::Value>) {
1554 self.inner.on_chain_end(outputs);
1555 }
1556
1557 pub async fn on_chain_error(&self, error: &dyn std::error::Error) {
1559 self.inner.on_chain_error(error);
1560 }
1561
1562 pub async fn on_agent_action(&self, action: &serde_json::Value) {
1564 self.inner.on_agent_action(action);
1565 }
1566
1567 pub async fn on_agent_finish(&self, finish: &serde_json::Value) {
1569 self.inner.on_agent_finish(finish);
1570 }
1571
1572 pub fn get_noop_manager() -> Self {
1574 Self {
1575 inner: CallbackManagerForChainRun::get_noop_manager(),
1576 }
1577 }
1578}
1579
1580#[derive(Debug, Clone)]
1582pub struct AsyncCallbackManagerForToolRun {
1583 inner: CallbackManagerForToolRun,
1585}
1586
1587impl AsyncCallbackManagerForToolRun {
1588 pub fn from_sync(inner: CallbackManagerForToolRun) -> Self {
1590 Self { inner }
1591 }
1592
1593 pub fn get_sync(&self) -> CallbackManagerForToolRun {
1595 self.inner.clone()
1596 }
1597
1598 pub fn run_id(&self) -> Uuid {
1600 self.inner.run_id()
1601 }
1602
1603 pub fn parent_run_id(&self) -> Option<Uuid> {
1605 self.inner.parent_run_id()
1606 }
1607
1608 pub fn handlers(&self) -> &[Arc<dyn BaseCallbackHandler>] {
1610 self.inner.handlers()
1611 }
1612
1613 pub fn get_child(&self, tag: Option<&str>) -> AsyncCallbackManager {
1615 AsyncCallbackManager::from_callback_manager(self.inner.get_child(tag))
1616 }
1617
1618 pub async fn on_tool_end(&self, output: &str) {
1620 self.inner.on_tool_end(output);
1621 }
1622
1623 pub async fn on_tool_error(&self, error: &dyn std::error::Error) {
1625 self.inner.on_tool_error(error);
1626 }
1627
1628 pub fn get_noop_manager() -> Self {
1630 Self {
1631 inner: CallbackManagerForToolRun::get_noop_manager(),
1632 }
1633 }
1634}
1635
1636#[derive(Debug, Clone)]
1638pub struct AsyncCallbackManagerForRetrieverRun {
1639 inner: CallbackManagerForRetrieverRun,
1641}
1642
1643impl AsyncCallbackManagerForRetrieverRun {
1644 pub fn from_sync(inner: CallbackManagerForRetrieverRun) -> Self {
1646 Self { inner }
1647 }
1648
1649 pub fn get_sync(&self) -> CallbackManagerForRetrieverRun {
1651 self.inner.clone()
1652 }
1653
1654 pub fn run_id(&self) -> Uuid {
1656 self.inner.run_id()
1657 }
1658
1659 pub fn parent_run_id(&self) -> Option<Uuid> {
1661 self.inner.parent_run_id()
1662 }
1663
1664 pub fn handlers(&self) -> &[Arc<dyn BaseCallbackHandler>] {
1666 self.inner.handlers()
1667 }
1668
1669 pub fn get_child(&self, tag: Option<&str>) -> AsyncCallbackManager {
1671 AsyncCallbackManager::from_callback_manager(self.inner.get_child(tag))
1672 }
1673
1674 pub async fn on_retriever_end(&self, documents: &[serde_json::Value]) {
1676 self.inner.on_retriever_end(documents);
1677 }
1678
1679 pub async fn on_retriever_error(&self, error: &dyn std::error::Error) {
1681 self.inner.on_retriever_error(error);
1682 }
1683
1684 pub fn get_noop_manager() -> Self {
1686 Self {
1687 inner: CallbackManagerForRetrieverRun::get_noop_manager(),
1688 }
1689 }
1690}
1691
1692#[cfg(test)]
1693mod tests {
1694 use super::*;
1695
1696 #[test]
1697 fn test_callback_manager_on_chain_start() {
1698 let manager = CallbackManager::new();
1699 let run_manager = manager.on_chain_start(&HashMap::new(), &HashMap::new(), None);
1700
1701 assert!(!run_manager.run_id().is_nil());
1702 }
1703
1704 #[test]
1705 fn test_callback_manager_configure() {
1706 let manager = CallbackManager::configure(
1707 None,
1708 None,
1709 Some(vec!["tag1".to_string()]),
1710 Some(vec!["tag2".to_string()]),
1711 None,
1712 None,
1713 false,
1714 );
1715
1716 assert!(manager.tags.contains(&"tag1".to_string()));
1717 assert!(manager.tags.contains(&"tag2".to_string()));
1718 assert!(manager.inheritable_tags.contains(&"tag1".to_string()));
1719 assert!(!manager.inheritable_tags.contains(&"tag2".to_string()));
1720 }
1721}
1722
1723#[derive(Debug, Clone)]
1728pub struct CallbackManagerForChainGroup {
1729 inner: CallbackManager,
1731 parent_run_manager: CallbackManagerForChainRun,
1733 pub ended: bool,
1735}
1736
1737impl CallbackManagerForChainGroup {
1738 #[allow(clippy::too_many_arguments)]
1740 pub fn new(
1741 handlers: Vec<Arc<dyn BaseCallbackHandler>>,
1742 inheritable_handlers: Option<Vec<Arc<dyn BaseCallbackHandler>>>,
1743 parent_run_id: Option<Uuid>,
1744 parent_run_manager: CallbackManagerForChainRun,
1745 tags: Option<Vec<String>>,
1746 inheritable_tags: Option<Vec<String>>,
1747 metadata: Option<HashMap<String, serde_json::Value>>,
1748 inheritable_metadata: Option<HashMap<String, serde_json::Value>>,
1749 ) -> Self {
1750 let mut inner = CallbackManager::new();
1751 inner.handlers = handlers;
1752 inner.inheritable_handlers = inheritable_handlers.unwrap_or_default();
1753 inner.parent_run_id = parent_run_id;
1754 inner.tags = tags.unwrap_or_default();
1755 inner.inheritable_tags = inheritable_tags.unwrap_or_default();
1756 inner.metadata = metadata.unwrap_or_default();
1757 inner.inheritable_metadata = inheritable_metadata.unwrap_or_default();
1758
1759 Self {
1760 inner,
1761 parent_run_manager,
1762 ended: false,
1763 }
1764 }
1765
1766 pub fn handlers(&self) -> &[Arc<dyn BaseCallbackHandler>] {
1768 &self.inner.handlers
1769 }
1770
1771 pub fn parent_run_id(&self) -> Option<Uuid> {
1773 self.inner.parent_run_id
1774 }
1775
1776 pub fn tags(&self) -> &[String] {
1778 &self.inner.tags
1779 }
1780
1781 pub fn copy(&self) -> Self {
1783 Self {
1784 inner: self.inner.clone(),
1785 parent_run_manager: self.parent_run_manager.clone(),
1786 ended: self.ended,
1787 }
1788 }
1789
1790 pub fn merge(&self, other: &CallbackManager) -> Self {
1792 let mut merged_inner = self.inner.clone();
1793
1794 for tag in &other.tags {
1796 if !merged_inner.tags.contains(tag) {
1797 merged_inner.tags.push(tag.clone());
1798 }
1799 }
1800 for tag in &other.inheritable_tags {
1801 if !merged_inner.inheritable_tags.contains(tag) {
1802 merged_inner.inheritable_tags.push(tag.clone());
1803 }
1804 }
1805
1806 merged_inner.metadata.extend(other.metadata.clone());
1808
1809 for handler in &other.handlers {
1811 merged_inner.add_handler(handler.clone(), false);
1812 }
1813
1814 Self {
1815 inner: merged_inner,
1816 parent_run_manager: self.parent_run_manager.clone(),
1817 ended: self.ended,
1818 }
1819 }
1820
1821 pub fn set_handlers(&mut self, handlers: Vec<Arc<dyn BaseCallbackHandler>>, inherit: bool) {
1823 self.inner.set_handlers(handlers, inherit);
1824 }
1825
1826 pub fn add_handler(&mut self, handler: Arc<dyn BaseCallbackHandler>, inherit: bool) {
1828 self.inner.add_handler(handler, inherit);
1829 }
1830
1831 pub fn add_tags(&mut self, tags: Vec<String>, inherit: bool) {
1833 self.inner.add_tags(tags, inherit);
1834 }
1835
1836 pub fn add_metadata(&mut self, metadata: HashMap<String, serde_json::Value>, inherit: bool) {
1838 self.inner.add_metadata(metadata, inherit);
1839 }
1840
1841 pub fn on_chain_end(&mut self, outputs: &HashMap<String, serde_json::Value>) {
1843 self.ended = true;
1844 self.parent_run_manager.on_chain_end(outputs);
1845 }
1846
1847 pub fn on_chain_error(&mut self, error: &dyn std::error::Error) {
1849 self.ended = true;
1850 self.parent_run_manager.on_chain_error(error);
1851 }
1852
1853 pub fn on_llm_start(
1855 &self,
1856 serialized: &HashMap<String, serde_json::Value>,
1857 prompts: &[String],
1858 run_id: Option<Uuid>,
1859 ) -> Vec<CallbackManagerForLLMRun> {
1860 self.inner.on_llm_start(serialized, prompts, run_id)
1861 }
1862
1863 pub fn on_chat_model_start(
1865 &self,
1866 serialized: &HashMap<String, serde_json::Value>,
1867 messages: &[Vec<BaseMessage>],
1868 run_id: Option<Uuid>,
1869 ) -> Vec<CallbackManagerForLLMRun> {
1870 self.inner.on_chat_model_start(serialized, messages, run_id)
1871 }
1872
1873 pub fn on_chain_start(
1875 &self,
1876 serialized: &HashMap<String, serde_json::Value>,
1877 inputs: &HashMap<String, serde_json::Value>,
1878 run_id: Option<Uuid>,
1879 ) -> CallbackManagerForChainRun {
1880 self.inner.on_chain_start(serialized, inputs, run_id)
1881 }
1882
1883 pub fn on_tool_start(
1885 &self,
1886 serialized: &HashMap<String, serde_json::Value>,
1887 input_str: &str,
1888 run_id: Option<Uuid>,
1889 inputs: Option<&HashMap<String, serde_json::Value>>,
1890 ) -> CallbackManagerForToolRun {
1891 self.inner
1892 .on_tool_start(serialized, input_str, run_id, inputs)
1893 }
1894
1895 pub fn on_retriever_start(
1897 &self,
1898 serialized: &HashMap<String, serde_json::Value>,
1899 query: &str,
1900 run_id: Option<Uuid>,
1901 ) -> CallbackManagerForRetrieverRun {
1902 self.inner.on_retriever_start(serialized, query, run_id)
1903 }
1904}
1905
1906#[derive(Debug, Clone)]
1908pub struct AsyncCallbackManagerForChainGroup {
1909 inner: AsyncCallbackManager,
1911 parent_run_manager: AsyncCallbackManagerForChainRun,
1913 pub ended: bool,
1915}
1916
1917impl AsyncCallbackManagerForChainGroup {
1918 #[allow(clippy::too_many_arguments)]
1920 pub fn new(
1921 handlers: Vec<Arc<dyn BaseCallbackHandler>>,
1922 inheritable_handlers: Option<Vec<Arc<dyn BaseCallbackHandler>>>,
1923 parent_run_id: Option<Uuid>,
1924 parent_run_manager: AsyncCallbackManagerForChainRun,
1925 tags: Option<Vec<String>>,
1926 inheritable_tags: Option<Vec<String>>,
1927 metadata: Option<HashMap<String, serde_json::Value>>,
1928 inheritable_metadata: Option<HashMap<String, serde_json::Value>>,
1929 ) -> Self {
1930 let mut inner_sync = CallbackManager::new();
1931 inner_sync.handlers = handlers;
1932 inner_sync.inheritable_handlers = inheritable_handlers.unwrap_or_default();
1933 inner_sync.parent_run_id = parent_run_id;
1934 inner_sync.tags = tags.unwrap_or_default();
1935 inner_sync.inheritable_tags = inheritable_tags.unwrap_or_default();
1936 inner_sync.metadata = metadata.unwrap_or_default();
1937 inner_sync.inheritable_metadata = inheritable_metadata.unwrap_or_default();
1938
1939 Self {
1940 inner: AsyncCallbackManager::from_callback_manager(inner_sync),
1941 parent_run_manager,
1942 ended: false,
1943 }
1944 }
1945
1946 pub fn handlers(&self) -> &[Arc<dyn BaseCallbackHandler>] {
1948 self.inner.handlers()
1949 }
1950
1951 pub fn parent_run_id(&self) -> Option<Uuid> {
1953 self.inner.parent_run_id()
1954 }
1955
1956 pub fn copy(&self) -> Self {
1958 Self {
1959 inner: self.inner.clone(),
1960 parent_run_manager: self.parent_run_manager.clone(),
1961 ended: self.ended,
1962 }
1963 }
1964
1965 pub fn merge(&self, other: &CallbackManager) -> Self {
1967 let mut inner_sync = self.inner.inner.clone();
1968
1969 for tag in &other.tags {
1971 if !inner_sync.tags.contains(tag) {
1972 inner_sync.tags.push(tag.clone());
1973 }
1974 }
1975 for tag in &other.inheritable_tags {
1976 if !inner_sync.inheritable_tags.contains(tag) {
1977 inner_sync.inheritable_tags.push(tag.clone());
1978 }
1979 }
1980
1981 inner_sync.metadata.extend(other.metadata.clone());
1983
1984 for handler in &other.handlers {
1986 inner_sync.add_handler(handler.clone(), false);
1987 }
1988
1989 Self {
1990 inner: AsyncCallbackManager::from_callback_manager(inner_sync),
1991 parent_run_manager: self.parent_run_manager.clone(),
1992 ended: self.ended,
1993 }
1994 }
1995
1996 pub fn set_handlers(&mut self, handlers: Vec<Arc<dyn BaseCallbackHandler>>, inherit: bool) {
1998 self.inner.set_handlers(handlers, inherit);
1999 }
2000
2001 pub fn add_handler(&mut self, handler: Arc<dyn BaseCallbackHandler>, inherit: bool) {
2003 self.inner.add_handler(handler, inherit);
2004 }
2005
2006 pub fn add_tags(&mut self, tags: Vec<String>, inherit: bool) {
2008 self.inner.add_tags(tags, inherit);
2009 }
2010
2011 pub fn add_metadata(&mut self, metadata: HashMap<String, serde_json::Value>, inherit: bool) {
2013 self.inner.add_metadata(metadata, inherit);
2014 }
2015
2016 pub async fn on_chain_end(&mut self, outputs: &HashMap<String, serde_json::Value>) {
2018 self.ended = true;
2019 self.parent_run_manager.on_chain_end(outputs).await;
2020 }
2021
2022 pub async fn on_chain_error(&mut self, error: &dyn std::error::Error) {
2024 self.ended = true;
2025 self.parent_run_manager.on_chain_error(error).await;
2026 }
2027
2028 pub async fn on_llm_start(
2030 &self,
2031 serialized: &HashMap<String, serde_json::Value>,
2032 prompts: &[String],
2033 run_id: Option<Uuid>,
2034 ) -> Vec<AsyncCallbackManagerForLLMRun> {
2035 self.inner.on_llm_start(serialized, prompts, run_id).await
2036 }
2037
2038 pub async fn on_chat_model_start(
2040 &self,
2041 serialized: &HashMap<String, serde_json::Value>,
2042 messages: &[Vec<BaseMessage>],
2043 run_id: Option<Uuid>,
2044 ) -> Vec<AsyncCallbackManagerForLLMRun> {
2045 self.inner
2046 .on_chat_model_start(serialized, messages, run_id)
2047 .await
2048 }
2049
2050 pub async fn on_chain_start(
2052 &self,
2053 serialized: &HashMap<String, serde_json::Value>,
2054 inputs: &HashMap<String, serde_json::Value>,
2055 run_id: Option<Uuid>,
2056 ) -> AsyncCallbackManagerForChainRun {
2057 self.inner.on_chain_start(serialized, inputs, run_id).await
2058 }
2059
2060 pub async fn on_tool_start(
2062 &self,
2063 serialized: &HashMap<String, serde_json::Value>,
2064 input_str: &str,
2065 run_id: Option<Uuid>,
2066 inputs: Option<&HashMap<String, serde_json::Value>>,
2067 ) -> AsyncCallbackManagerForToolRun {
2068 self.inner
2069 .on_tool_start(serialized, input_str, run_id, inputs)
2070 .await
2071 }
2072
2073 pub async fn on_retriever_start(
2075 &self,
2076 serialized: &HashMap<String, serde_json::Value>,
2077 query: &str,
2078 run_id: Option<Uuid>,
2079 ) -> AsyncCallbackManagerForRetrieverRun {
2080 self.inner
2081 .on_retriever_start(serialized, query, run_id)
2082 .await
2083 }
2084}
2085
2086pub fn trace_as_chain_group<F, R>(
2091 group_name: &str,
2092 callback_manager: Option<CallbackManager>,
2093 inputs: Option<HashMap<String, serde_json::Value>>,
2094 tags: Option<Vec<String>>,
2095 metadata: Option<HashMap<String, serde_json::Value>>,
2096 run_id: Option<Uuid>,
2097 f: F,
2098) -> R
2099where
2100 F: FnOnce(&mut CallbackManagerForChainGroup) -> R,
2101{
2102 let cm = callback_manager.unwrap_or_else(|| {
2103 CallbackManager::configure(
2104 None,
2105 None,
2106 tags.clone(),
2107 None,
2108 metadata.clone(),
2109 None,
2110 false,
2111 )
2112 });
2113
2114 let mut serialized = HashMap::new();
2115 serialized.insert(
2116 "name".to_string(),
2117 serde_json::Value::String(group_name.to_string()),
2118 );
2119
2120 let run_manager = cm.on_chain_start(&serialized, &inputs.clone().unwrap_or_default(), run_id);
2121 let child_cm = run_manager.get_child(None);
2122
2123 let mut group_cm = CallbackManagerForChainGroup::new(
2124 child_cm.handlers.clone(),
2125 Some(child_cm.inheritable_handlers.clone()),
2126 child_cm.parent_run_id,
2127 run_manager.clone(),
2128 Some(child_cm.tags.clone()),
2129 Some(child_cm.inheritable_tags.clone()),
2130 Some(child_cm.metadata.clone()),
2131 Some(child_cm.inheritable_metadata.clone()),
2132 );
2133
2134 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| f(&mut group_cm)));
2135
2136 match result {
2137 Ok(r) => {
2138 if !group_cm.ended {
2139 run_manager.on_chain_end(&HashMap::new());
2140 }
2141 r
2142 }
2143 Err(e) => {
2144 if !group_cm.ended {
2145 run_manager.on_chain_error(&ChainGroupPanicError);
2146 }
2147 std::panic::resume_unwind(e)
2148 }
2149 }
2150}
2151
2152#[derive(Debug)]
2154struct ChainGroupPanicError;
2155
2156impl std::fmt::Display for ChainGroupPanicError {
2157 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2158 write!(f, "Chain group panicked")
2159 }
2160}
2161
2162impl std::error::Error for ChainGroupPanicError {}
2163
2164pub fn dispatch_custom_event(
2170 name: &str,
2171 data: &serde_json::Value,
2172 callback_manager: &CallbackManager,
2173) -> Result<(), &'static str> {
2174 if callback_manager.handlers.is_empty() {
2175 return Ok(());
2176 }
2177
2178 let parent_run_id = callback_manager
2179 .parent_run_id
2180 .ok_or("Unable to dispatch an adhoc event without a parent run id.")?;
2181
2182 let run_id = parent_run_id;
2183
2184 handle_event(
2185 &callback_manager.handlers,
2186 Some(|h: &dyn BaseCallbackHandler| h.ignore_custom_event()),
2187 |_handler| {
2188 let _ = (name, data, run_id);
2189 },
2190 );
2191
2192 Ok(())
2193}
2194
2195pub async fn atrace_as_chain_group<F, Fut, R>(
2214 group_name: &str,
2215 callback_manager: Option<AsyncCallbackManager>,
2216 inputs: Option<HashMap<String, serde_json::Value>>,
2217 tags: Option<Vec<String>>,
2218 metadata: Option<HashMap<String, serde_json::Value>>,
2219 run_id: Option<Uuid>,
2220 f: F,
2221) -> R
2222where
2223 F: FnOnce(AsyncCallbackManagerForChainGroup) -> Fut,
2224 Fut: Future<Output = R>,
2225{
2226 let cm = callback_manager.unwrap_or_else(|| {
2227 AsyncCallbackManager::configure(
2228 None,
2229 None,
2230 tags.clone(),
2231 None,
2232 metadata.clone(),
2233 None,
2234 false,
2235 )
2236 });
2237
2238 let mut serialized = HashMap::new();
2239 serialized.insert(
2240 "name".to_string(),
2241 serde_json::Value::String(group_name.to_string()),
2242 );
2243
2244 let run_manager = cm
2245 .on_chain_start(&serialized, &inputs.clone().unwrap_or_default(), run_id)
2246 .await;
2247 let child_cm = run_manager.get_child(None);
2248
2249 let group_cm = AsyncCallbackManagerForChainGroup::new(
2250 child_cm.handlers().to_vec(),
2251 Some(child_cm.inner.inheritable_handlers.clone()),
2252 child_cm.parent_run_id(),
2253 run_manager.clone(),
2254 Some(child_cm.inner.tags.clone()),
2255 Some(child_cm.inner.inheritable_tags.clone()),
2256 Some(child_cm.inner.metadata.clone()),
2257 Some(child_cm.inner.inheritable_metadata.clone()),
2258 );
2259
2260 let result = f(group_cm.clone()).await;
2261
2262 if !group_cm.ended {
2263 run_manager.on_chain_end(&HashMap::new()).await;
2264 }
2265
2266 result
2267}
2268
2269pub async fn adispatch_custom_event(
2275 name: &str,
2276 data: &serde_json::Value,
2277 callback_manager: &AsyncCallbackManager,
2278) -> Result<(), &'static str> {
2279 if callback_manager.handlers().is_empty() {
2280 return Ok(());
2281 }
2282
2283 let parent_run_id = callback_manager
2284 .parent_run_id()
2285 .ok_or("Unable to dispatch an adhoc event without a parent run id.")?;
2286
2287 let run_id = parent_run_id;
2288
2289 handle_event(
2290 callback_manager.handlers(),
2291 Some(|h: &dyn BaseCallbackHandler| h.ignore_custom_event()),
2292 |_handler| {
2293 let _ = (name, data, run_id);
2294 },
2295 );
2296
2297 Ok(())
2298}