agent_chain_core/callbacks/
manager.rs

1//! Run managers and callback managers for LangChain.
2//!
3//! This module provides the callback manager and run manager types that
4//! handle callback dispatch during LangChain operations.
5
6use std::collections::HashMap;
7use std::future::Future;
8use std::sync::Arc;
9
10use uuid::Uuid;
11
12use crate::messages::BaseMessage;
13use crate::outputs::ChatResult;
14
15use super::base::{BaseCallbackHandler, BaseCallbackManager, Callbacks};
16use crate::utils::uuid::uuid7;
17
18/// Handle an event for the given handlers.
19///
20/// This function dispatches an event to all handlers that don't ignore it.
21pub fn handle_event<F>(
22    handlers: &[Arc<dyn BaseCallbackHandler>],
23    ignore_condition: Option<fn(&dyn BaseCallbackHandler) -> bool>,
24    mut event_fn: F,
25) where
26    F: FnMut(&Arc<dyn BaseCallbackHandler>),
27{
28    for handler in handlers {
29        if let Some(ignore_fn) = ignore_condition
30            && ignore_fn(handler.as_ref())
31        {
32            continue;
33        }
34        event_fn(handler);
35    }
36}
37
38/// Async generic event handler for `AsyncCallbackManager`.
39///
40/// This function dispatches events to handlers asynchronously.
41/// Handlers with `run_inline = true` are run sequentially first,
42/// then non-inline handlers are run concurrently.
43pub async fn ahandle_event<F, Fut>(
44    handlers: &[Arc<dyn BaseCallbackHandler>],
45    ignore_condition: Option<fn(&dyn BaseCallbackHandler) -> bool>,
46    event_fn: F,
47) where
48    F: Fn(&Arc<dyn BaseCallbackHandler>) -> Fut + Send + Sync,
49    Fut: Future<Output = ()> + Send,
50{
51    // First, run inline handlers sequentially
52    for handler in handlers.iter().filter(|h| h.run_inline()) {
53        if let Some(ignore_fn) = ignore_condition
54            && ignore_fn(handler.as_ref())
55        {
56            continue;
57        }
58        event_fn(handler).await;
59    }
60
61    // Then, run non-inline handlers concurrently
62    let non_inline_futures: Vec<_> = handlers
63        .iter()
64        .filter(|h| !h.run_inline())
65        .filter(|h| {
66            if let Some(ignore_fn) = ignore_condition {
67                !ignore_fn(h.as_ref())
68            } else {
69                true
70            }
71        })
72        .map(event_fn)
73        .collect();
74
75    futures::future::join_all(non_inline_futures).await;
76}
77
78/// Base class for run manager (a bound callback manager).
79#[derive(Debug, Clone)]
80pub struct BaseRunManager {
81    /// The ID of the run.
82    pub run_id: Uuid,
83    /// The list of handlers.
84    pub handlers: Vec<Arc<dyn BaseCallbackHandler>>,
85    /// The list of inheritable handlers.
86    pub inheritable_handlers: Vec<Arc<dyn BaseCallbackHandler>>,
87    /// The ID of the parent run.
88    pub parent_run_id: Option<Uuid>,
89    /// The list of tags.
90    pub tags: Vec<String>,
91    /// The list of inheritable tags.
92    pub inheritable_tags: Vec<String>,
93    /// The metadata.
94    pub metadata: HashMap<String, serde_json::Value>,
95    /// The inheritable metadata.
96    pub inheritable_metadata: HashMap<String, serde_json::Value>,
97}
98
99impl BaseRunManager {
100    /// Create a new base run manager.
101    #[allow(clippy::too_many_arguments)]
102    pub fn new(
103        run_id: Uuid,
104        handlers: Vec<Arc<dyn BaseCallbackHandler>>,
105        inheritable_handlers: Vec<Arc<dyn BaseCallbackHandler>>,
106        parent_run_id: Option<Uuid>,
107        tags: Option<Vec<String>>,
108        inheritable_tags: Option<Vec<String>>,
109        metadata: Option<HashMap<String, serde_json::Value>>,
110        inheritable_metadata: Option<HashMap<String, serde_json::Value>>,
111    ) -> Self {
112        Self {
113            run_id,
114            handlers,
115            inheritable_handlers,
116            parent_run_id,
117            tags: tags.unwrap_or_default(),
118            inheritable_tags: inheritable_tags.unwrap_or_default(),
119            metadata: metadata.unwrap_or_default(),
120            inheritable_metadata: inheritable_metadata.unwrap_or_default(),
121        }
122    }
123
124    /// Return a manager that doesn't perform any operations.
125    pub fn get_noop_manager() -> Self {
126        Self {
127            run_id: uuid7(None),
128            handlers: Vec::new(),
129            inheritable_handlers: Vec::new(),
130            parent_run_id: None,
131            tags: Vec::new(),
132            inheritable_tags: Vec::new(),
133            metadata: HashMap::new(),
134            inheritable_metadata: HashMap::new(),
135        }
136    }
137}
138
139/// Sync Run Manager.
140#[derive(Debug, Clone)]
141pub struct RunManager {
142    /// The base run manager.
143    inner: BaseRunManager,
144}
145
146impl RunManager {
147    /// Create a new run manager.
148    #[allow(clippy::too_many_arguments)]
149    pub fn new(
150        run_id: Uuid,
151        handlers: Vec<Arc<dyn BaseCallbackHandler>>,
152        inheritable_handlers: Vec<Arc<dyn BaseCallbackHandler>>,
153        parent_run_id: Option<Uuid>,
154        tags: Option<Vec<String>>,
155        inheritable_tags: Option<Vec<String>>,
156        metadata: Option<HashMap<String, serde_json::Value>>,
157        inheritable_metadata: Option<HashMap<String, serde_json::Value>>,
158    ) -> Self {
159        Self {
160            inner: BaseRunManager::new(
161                run_id,
162                handlers,
163                inheritable_handlers,
164                parent_run_id,
165                tags,
166                inheritable_tags,
167                metadata,
168                inheritable_metadata,
169            ),
170        }
171    }
172
173    /// Get the run ID.
174    pub fn run_id(&self) -> Uuid {
175        self.inner.run_id
176    }
177
178    /// Get the parent run ID.
179    pub fn parent_run_id(&self) -> Option<Uuid> {
180        self.inner.parent_run_id
181    }
182
183    /// Get the handlers.
184    pub fn handlers(&self) -> &[Arc<dyn BaseCallbackHandler>] {
185        &self.inner.handlers
186    }
187
188    /// Get the tags.
189    pub fn tags(&self) -> &[String] {
190        &self.inner.tags
191    }
192
193    /// Run when a text is received.
194    pub fn on_text(&self, text: &str) {
195        if self.inner.handlers.is_empty() {
196            return;
197        }
198        let run_id = self.inner.run_id;
199        let parent_run_id = self.inner.parent_run_id;
200        let tags = self.inner.tags.clone();
201        handle_event(&self.inner.handlers, None, |_handler| {
202            let _ = (text, run_id, parent_run_id, &tags);
203        });
204    }
205
206    /// Run when a retry is received.
207    pub fn on_retry(&self, retry_state: &serde_json::Value) {
208        if self.inner.handlers.is_empty() {
209            return;
210        }
211        let run_id = self.inner.run_id;
212        let parent_run_id = self.inner.parent_run_id;
213        let tags = self.inner.tags.clone();
214        handle_event(
215            &self.inner.handlers,
216            Some(|h: &dyn BaseCallbackHandler| h.ignore_retry()),
217            |_handler| {
218                let _ = (retry_state, run_id, parent_run_id, &tags);
219            },
220        );
221    }
222
223    /// Return a noop manager.
224    pub fn get_noop_manager() -> Self {
225        Self {
226            inner: BaseRunManager::get_noop_manager(),
227        }
228    }
229}
230
231/// Async Run Manager.
232///
233/// This is the async counterpart to `RunManager`.
234#[derive(Debug, Clone)]
235pub struct AsyncRunManager {
236    /// The base run manager.
237    inner: BaseRunManager,
238}
239
240impl AsyncRunManager {
241    /// Create a new async run manager.
242    #[allow(clippy::too_many_arguments)]
243    pub fn new(
244        run_id: Uuid,
245        handlers: Vec<Arc<dyn BaseCallbackHandler>>,
246        inheritable_handlers: Vec<Arc<dyn BaseCallbackHandler>>,
247        parent_run_id: Option<Uuid>,
248        tags: Option<Vec<String>>,
249        inheritable_tags: Option<Vec<String>>,
250        metadata: Option<HashMap<String, serde_json::Value>>,
251        inheritable_metadata: Option<HashMap<String, serde_json::Value>>,
252    ) -> Self {
253        Self {
254            inner: BaseRunManager::new(
255                run_id,
256                handlers,
257                inheritable_handlers,
258                parent_run_id,
259                tags,
260                inheritable_tags,
261                metadata,
262                inheritable_metadata,
263            ),
264        }
265    }
266
267    /// Get the sync version of this run manager.
268    pub fn get_sync(&self) -> RunManager {
269        RunManager::new(
270            self.inner.run_id,
271            self.inner.handlers.clone(),
272            self.inner.inheritable_handlers.clone(),
273            self.inner.parent_run_id,
274            Some(self.inner.tags.clone()),
275            Some(self.inner.inheritable_tags.clone()),
276            Some(self.inner.metadata.clone()),
277            Some(self.inner.inheritable_metadata.clone()),
278        )
279    }
280
281    /// Get the run ID.
282    pub fn run_id(&self) -> Uuid {
283        self.inner.run_id
284    }
285
286    /// Get the parent run ID.
287    pub fn parent_run_id(&self) -> Option<Uuid> {
288        self.inner.parent_run_id
289    }
290
291    /// Get the handlers.
292    pub fn handlers(&self) -> &[Arc<dyn BaseCallbackHandler>] {
293        &self.inner.handlers
294    }
295
296    /// Get the tags.
297    pub fn tags(&self) -> &[String] {
298        &self.inner.tags
299    }
300
301    /// Run when a text is received (async).
302    pub async fn on_text(&self, text: &str) {
303        if self.inner.handlers.is_empty() {
304            return;
305        }
306        let run_id = self.inner.run_id;
307        let parent_run_id = self.inner.parent_run_id;
308        let tags = self.inner.tags.clone();
309        ahandle_event(&self.inner.handlers, None, |_handler| {
310            let _ = (text, run_id, parent_run_id, &tags);
311            async {}
312        })
313        .await;
314    }
315
316    /// Run when a retry is received (async).
317    pub async fn on_retry(&self, retry_state: &serde_json::Value) {
318        if self.inner.handlers.is_empty() {
319            return;
320        }
321        let run_id = self.inner.run_id;
322        let parent_run_id = self.inner.parent_run_id;
323        let tags = self.inner.tags.clone();
324        ahandle_event(
325            &self.inner.handlers,
326            Some(|h: &dyn BaseCallbackHandler| h.ignore_retry()),
327            |_handler| {
328                let _ = (retry_state, run_id, parent_run_id, &tags);
329                async {}
330            },
331        )
332        .await;
333    }
334
335    /// Return a noop manager.
336    pub fn get_noop_manager() -> Self {
337        Self {
338            inner: BaseRunManager::get_noop_manager(),
339        }
340    }
341}
342
343/// Async Parent Run Manager.
344///
345/// This is the async counterpart to `ParentRunManager`.
346#[derive(Debug, Clone)]
347pub struct AsyncParentRunManager {
348    /// The inner async run manager.
349    inner: AsyncRunManager,
350}
351
352impl AsyncParentRunManager {
353    /// Create a new async parent run manager.
354    #[allow(clippy::too_many_arguments)]
355    pub fn new(
356        run_id: Uuid,
357        handlers: Vec<Arc<dyn BaseCallbackHandler>>,
358        inheritable_handlers: Vec<Arc<dyn BaseCallbackHandler>>,
359        parent_run_id: Option<Uuid>,
360        tags: Option<Vec<String>>,
361        inheritable_tags: Option<Vec<String>>,
362        metadata: Option<HashMap<String, serde_json::Value>>,
363        inheritable_metadata: Option<HashMap<String, serde_json::Value>>,
364    ) -> Self {
365        Self {
366            inner: AsyncRunManager::new(
367                run_id,
368                handlers,
369                inheritable_handlers,
370                parent_run_id,
371                tags,
372                inheritable_tags,
373                metadata,
374                inheritable_metadata,
375            ),
376        }
377    }
378
379    /// Get a child async callback manager.
380    pub fn get_child(&self, tag: Option<&str>) -> AsyncCallbackManager {
381        let mut manager = AsyncCallbackManager::new();
382        manager.inner.parent_run_id = Some(self.inner.run_id());
383        manager.set_handlers(self.inner.inner.inheritable_handlers.clone(), true);
384        manager.add_tags(self.inner.inner.inheritable_tags.clone(), true);
385        manager.add_metadata(self.inner.inner.inheritable_metadata.clone(), true);
386        if let Some(tag) = tag {
387            manager.add_tags(vec![tag.to_string()], false);
388        }
389        manager
390    }
391
392    /// Get the sync version.
393    pub fn get_sync(&self) -> ParentRunManager {
394        ParentRunManager::new(
395            self.inner.inner.run_id,
396            self.inner.inner.handlers.clone(),
397            self.inner.inner.inheritable_handlers.clone(),
398            self.inner.inner.parent_run_id,
399            Some(self.inner.inner.tags.clone()),
400            Some(self.inner.inner.inheritable_tags.clone()),
401            Some(self.inner.inner.metadata.clone()),
402            Some(self.inner.inner.inheritable_metadata.clone()),
403        )
404    }
405
406    /// Get the run ID.
407    pub fn run_id(&self) -> Uuid {
408        self.inner.run_id()
409    }
410
411    /// Get the parent run ID.
412    pub fn parent_run_id(&self) -> Option<Uuid> {
413        self.inner.parent_run_id()
414    }
415
416    /// Get the handlers.
417    pub fn handlers(&self) -> &[Arc<dyn BaseCallbackHandler>] {
418        self.inner.handlers()
419    }
420
421    /// Get the tags.
422    pub fn tags(&self) -> &[String] {
423        self.inner.tags()
424    }
425
426    /// Return a noop manager.
427    pub fn get_noop_manager() -> Self {
428        Self {
429            inner: AsyncRunManager::get_noop_manager(),
430        }
431    }
432}
433
434/// Sync Parent Run Manager.
435#[derive(Debug, Clone)]
436pub struct ParentRunManager {
437    /// The inner run manager.
438    inner: RunManager,
439}
440
441impl ParentRunManager {
442    /// Create a new parent run manager.
443    #[allow(clippy::too_many_arguments)]
444    pub fn new(
445        run_id: Uuid,
446        handlers: Vec<Arc<dyn BaseCallbackHandler>>,
447        inheritable_handlers: Vec<Arc<dyn BaseCallbackHandler>>,
448        parent_run_id: Option<Uuid>,
449        tags: Option<Vec<String>>,
450        inheritable_tags: Option<Vec<String>>,
451        metadata: Option<HashMap<String, serde_json::Value>>,
452        inheritable_metadata: Option<HashMap<String, serde_json::Value>>,
453    ) -> Self {
454        Self {
455            inner: RunManager::new(
456                run_id,
457                handlers,
458                inheritable_handlers,
459                parent_run_id,
460                tags,
461                inheritable_tags,
462                metadata,
463                inheritable_metadata,
464            ),
465        }
466    }
467
468    /// Get a child callback manager.
469    pub fn get_child(&self, tag: Option<&str>) -> CallbackManager {
470        let mut manager = CallbackManager::new();
471        manager.parent_run_id = Some(self.inner.run_id());
472        manager.set_handlers(self.inner.inner.inheritable_handlers.clone(), true);
473        manager.add_tags(self.inner.inner.inheritable_tags.clone(), true);
474        manager.add_metadata(self.inner.inner.inheritable_metadata.clone(), true);
475        if let Some(tag) = tag {
476            manager.add_tags(vec![tag.to_string()], false);
477        }
478        manager
479    }
480
481    /// Get the run ID.
482    pub fn run_id(&self) -> Uuid {
483        self.inner.run_id()
484    }
485
486    /// Get the parent run ID.
487    pub fn parent_run_id(&self) -> Option<Uuid> {
488        self.inner.parent_run_id()
489    }
490
491    /// Get the handlers.
492    pub fn handlers(&self) -> &[Arc<dyn BaseCallbackHandler>] {
493        self.inner.handlers()
494    }
495
496    /// Get the tags.
497    pub fn tags(&self) -> &[String] {
498        self.inner.tags()
499    }
500
501    /// Return a noop manager.
502    pub fn get_noop_manager() -> Self {
503        Self {
504            inner: RunManager::get_noop_manager(),
505        }
506    }
507}
508
509/// Callback manager for LLM run.
510#[derive(Debug, Clone)]
511pub struct CallbackManagerForLLMRun {
512    /// The inner run manager.
513    inner: RunManager,
514}
515
516impl CallbackManagerForLLMRun {
517    /// Create a new callback manager for LLM run.
518    #[allow(clippy::too_many_arguments)]
519    pub fn new(
520        run_id: Uuid,
521        handlers: Vec<Arc<dyn BaseCallbackHandler>>,
522        inheritable_handlers: Vec<Arc<dyn BaseCallbackHandler>>,
523        parent_run_id: Option<Uuid>,
524        tags: Option<Vec<String>>,
525        inheritable_tags: Option<Vec<String>>,
526        metadata: Option<HashMap<String, serde_json::Value>>,
527        inheritable_metadata: Option<HashMap<String, serde_json::Value>>,
528    ) -> Self {
529        Self {
530            inner: RunManager::new(
531                run_id,
532                handlers,
533                inheritable_handlers,
534                parent_run_id,
535                tags,
536                inheritable_tags,
537                metadata,
538                inheritable_metadata,
539            ),
540        }
541    }
542
543    /// Get the run ID.
544    pub fn run_id(&self) -> Uuid {
545        self.inner.run_id()
546    }
547
548    /// Get the parent run ID.
549    pub fn parent_run_id(&self) -> Option<Uuid> {
550        self.inner.parent_run_id()
551    }
552
553    /// Get the handlers.
554    pub fn handlers(&self) -> &[Arc<dyn BaseCallbackHandler>] {
555        self.inner.handlers()
556    }
557
558    /// Get the tags.
559    pub fn tags(&self) -> &[String] {
560        self.inner.tags()
561    }
562
563    /// Run when LLM generates a new token.
564    pub fn on_llm_new_token(&self, token: &str, chunk: Option<&serde_json::Value>) {
565        if self.inner.inner.handlers.is_empty() {
566            return;
567        }
568        let run_id = self.inner.run_id();
569        let parent_run_id = self.inner.parent_run_id();
570        let tags = self.inner.tags().to_vec();
571        handle_event(
572            self.inner.handlers(),
573            Some(|h: &dyn BaseCallbackHandler| h.ignore_llm()),
574            |_handler| {
575                let _ = (token, run_id, parent_run_id, chunk, &tags);
576            },
577        );
578    }
579
580    /// Run when LLM ends running.
581    pub fn on_llm_end(&self, response: &ChatResult) {
582        if self.inner.inner.handlers.is_empty() {
583            return;
584        }
585        let run_id = self.inner.run_id();
586        let parent_run_id = self.inner.parent_run_id();
587        let tags = self.inner.tags().to_vec();
588        handle_event(
589            self.inner.handlers(),
590            Some(|h: &dyn BaseCallbackHandler| h.ignore_llm()),
591            |_handler| {
592                let _ = (response, run_id, parent_run_id, &tags);
593            },
594        );
595    }
596
597    /// Run when LLM errors.
598    pub fn on_llm_error(&self, error: &dyn std::error::Error) {
599        if self.inner.inner.handlers.is_empty() {
600            return;
601        }
602        let run_id = self.inner.run_id();
603        let parent_run_id = self.inner.parent_run_id();
604        let tags = self.inner.tags().to_vec();
605        handle_event(
606            self.inner.handlers(),
607            Some(|h: &dyn BaseCallbackHandler| h.ignore_llm()),
608            |_handler| {
609                let _ = (error, run_id, parent_run_id, &tags);
610            },
611        );
612    }
613
614    /// Return a noop manager.
615    pub fn get_noop_manager() -> Self {
616        Self {
617            inner: RunManager::get_noop_manager(),
618        }
619    }
620}
621
622/// Callback manager for chain run.
623#[derive(Debug, Clone)]
624pub struct CallbackManagerForChainRun {
625    /// The inner parent run manager.
626    inner: ParentRunManager,
627}
628
629impl CallbackManagerForChainRun {
630    /// Create a new callback manager for chain run.
631    #[allow(clippy::too_many_arguments)]
632    pub fn new(
633        run_id: Uuid,
634        handlers: Vec<Arc<dyn BaseCallbackHandler>>,
635        inheritable_handlers: Vec<Arc<dyn BaseCallbackHandler>>,
636        parent_run_id: Option<Uuid>,
637        tags: Option<Vec<String>>,
638        inheritable_tags: Option<Vec<String>>,
639        metadata: Option<HashMap<String, serde_json::Value>>,
640        inheritable_metadata: Option<HashMap<String, serde_json::Value>>,
641    ) -> Self {
642        Self {
643            inner: ParentRunManager::new(
644                run_id,
645                handlers,
646                inheritable_handlers,
647                parent_run_id,
648                tags,
649                inheritable_tags,
650                metadata,
651                inheritable_metadata,
652            ),
653        }
654    }
655
656    /// Get the run ID.
657    pub fn run_id(&self) -> Uuid {
658        self.inner.run_id()
659    }
660
661    /// Get the parent run ID.
662    pub fn parent_run_id(&self) -> Option<Uuid> {
663        self.inner.parent_run_id()
664    }
665
666    /// Get the handlers.
667    pub fn handlers(&self) -> &[Arc<dyn BaseCallbackHandler>] {
668        self.inner.handlers()
669    }
670
671    /// Get the tags.
672    pub fn tags(&self) -> &[String] {
673        self.inner.tags()
674    }
675
676    /// Get a child callback manager.
677    pub fn get_child(&self, tag: Option<&str>) -> CallbackManager {
678        self.inner.get_child(tag)
679    }
680
681    /// Run when chain ends running.
682    pub fn on_chain_end(&self, outputs: &HashMap<String, serde_json::Value>) {
683        if self.inner.inner.inner.handlers.is_empty() {
684            return;
685        }
686        let run_id = self.inner.run_id();
687        let parent_run_id = self.inner.parent_run_id();
688        let tags = self.inner.tags().to_vec();
689        handle_event(
690            self.inner.handlers(),
691            Some(|h: &dyn BaseCallbackHandler| h.ignore_chain()),
692            |_handler| {
693                let _ = (outputs, run_id, parent_run_id, &tags);
694            },
695        );
696    }
697
698    /// Run when chain errors.
699    pub fn on_chain_error(&self, error: &dyn std::error::Error) {
700        if self.inner.inner.inner.handlers.is_empty() {
701            return;
702        }
703        let run_id = self.inner.run_id();
704        let parent_run_id = self.inner.parent_run_id();
705        let tags = self.inner.tags().to_vec();
706        handle_event(
707            self.inner.handlers(),
708            Some(|h: &dyn BaseCallbackHandler| h.ignore_chain()),
709            |_handler| {
710                let _ = (error, run_id, parent_run_id, &tags);
711            },
712        );
713    }
714
715    /// Run when agent action is received.
716    pub fn on_agent_action(&self, action: &serde_json::Value) {
717        if self.inner.inner.inner.handlers.is_empty() {
718            return;
719        }
720        let run_id = self.inner.run_id();
721        let parent_run_id = self.inner.parent_run_id();
722        let tags = self.inner.tags().to_vec();
723        handle_event(
724            self.inner.handlers(),
725            Some(|h: &dyn BaseCallbackHandler| h.ignore_agent()),
726            |_handler| {
727                let _ = (action, run_id, parent_run_id, &tags);
728            },
729        );
730    }
731
732    /// Run when agent finish is received.
733    pub fn on_agent_finish(&self, finish: &serde_json::Value) {
734        if self.inner.inner.inner.handlers.is_empty() {
735            return;
736        }
737        let run_id = self.inner.run_id();
738        let parent_run_id = self.inner.parent_run_id();
739        let tags = self.inner.tags().to_vec();
740        handle_event(
741            self.inner.handlers(),
742            Some(|h: &dyn BaseCallbackHandler| h.ignore_agent()),
743            |_handler| {
744                let _ = (finish, run_id, parent_run_id, &tags);
745            },
746        );
747    }
748
749    /// Return a noop manager.
750    pub fn get_noop_manager() -> Self {
751        Self {
752            inner: ParentRunManager::get_noop_manager(),
753        }
754    }
755}
756
757/// Callback manager for tool run.
758#[derive(Debug, Clone)]
759pub struct CallbackManagerForToolRun {
760    /// The inner parent run manager.
761    inner: ParentRunManager,
762}
763
764impl CallbackManagerForToolRun {
765    /// Create a new callback manager for tool run.
766    #[allow(clippy::too_many_arguments)]
767    pub fn new(
768        run_id: Uuid,
769        handlers: Vec<Arc<dyn BaseCallbackHandler>>,
770        inheritable_handlers: Vec<Arc<dyn BaseCallbackHandler>>,
771        parent_run_id: Option<Uuid>,
772        tags: Option<Vec<String>>,
773        inheritable_tags: Option<Vec<String>>,
774        metadata: Option<HashMap<String, serde_json::Value>>,
775        inheritable_metadata: Option<HashMap<String, serde_json::Value>>,
776    ) -> Self {
777        Self {
778            inner: ParentRunManager::new(
779                run_id,
780                handlers,
781                inheritable_handlers,
782                parent_run_id,
783                tags,
784                inheritable_tags,
785                metadata,
786                inheritable_metadata,
787            ),
788        }
789    }
790
791    /// Get the run ID.
792    pub fn run_id(&self) -> Uuid {
793        self.inner.run_id()
794    }
795
796    /// Get the parent run ID.
797    pub fn parent_run_id(&self) -> Option<Uuid> {
798        self.inner.parent_run_id()
799    }
800
801    /// Get the handlers.
802    pub fn handlers(&self) -> &[Arc<dyn BaseCallbackHandler>] {
803        self.inner.handlers()
804    }
805
806    /// Get the tags.
807    pub fn tags(&self) -> &[String] {
808        self.inner.tags()
809    }
810
811    /// Get a child callback manager.
812    pub fn get_child(&self, tag: Option<&str>) -> CallbackManager {
813        self.inner.get_child(tag)
814    }
815
816    /// Run when tool ends running.
817    pub fn on_tool_end(&self, output: &str) {
818        if self.inner.inner.inner.handlers.is_empty() {
819            return;
820        }
821        let run_id = self.inner.run_id();
822        let parent_run_id = self.inner.parent_run_id();
823        let tags = self.inner.tags().to_vec();
824        handle_event(
825            self.inner.handlers(),
826            Some(|h: &dyn BaseCallbackHandler| h.ignore_agent()),
827            |_handler| {
828                let _ = (output, run_id, parent_run_id, &tags);
829            },
830        );
831    }
832
833    /// Run when tool errors.
834    pub fn on_tool_error(&self, error: &dyn std::error::Error) {
835        if self.inner.inner.inner.handlers.is_empty() {
836            return;
837        }
838        let run_id = self.inner.run_id();
839        let parent_run_id = self.inner.parent_run_id();
840        let tags = self.inner.tags().to_vec();
841        handle_event(
842            self.inner.handlers(),
843            Some(|h: &dyn BaseCallbackHandler| h.ignore_agent()),
844            |_handler| {
845                let _ = (error, run_id, parent_run_id, &tags);
846            },
847        );
848    }
849
850    /// Return a noop manager.
851    pub fn get_noop_manager() -> Self {
852        Self {
853            inner: ParentRunManager::get_noop_manager(),
854        }
855    }
856}
857
858/// Callback manager for retriever run.
859#[derive(Debug, Clone)]
860pub struct CallbackManagerForRetrieverRun {
861    /// The inner parent run manager.
862    inner: ParentRunManager,
863}
864
865impl CallbackManagerForRetrieverRun {
866    /// Create a new callback manager for retriever run.
867    #[allow(clippy::too_many_arguments)]
868    pub fn new(
869        run_id: Uuid,
870        handlers: Vec<Arc<dyn BaseCallbackHandler>>,
871        inheritable_handlers: Vec<Arc<dyn BaseCallbackHandler>>,
872        parent_run_id: Option<Uuid>,
873        tags: Option<Vec<String>>,
874        inheritable_tags: Option<Vec<String>>,
875        metadata: Option<HashMap<String, serde_json::Value>>,
876        inheritable_metadata: Option<HashMap<String, serde_json::Value>>,
877    ) -> Self {
878        Self {
879            inner: ParentRunManager::new(
880                run_id,
881                handlers,
882                inheritable_handlers,
883                parent_run_id,
884                tags,
885                inheritable_tags,
886                metadata,
887                inheritable_metadata,
888            ),
889        }
890    }
891
892    /// Get the run ID.
893    pub fn run_id(&self) -> Uuid {
894        self.inner.run_id()
895    }
896
897    /// Get the parent run ID.
898    pub fn parent_run_id(&self) -> Option<Uuid> {
899        self.inner.parent_run_id()
900    }
901
902    /// Get the handlers.
903    pub fn handlers(&self) -> &[Arc<dyn BaseCallbackHandler>] {
904        self.inner.handlers()
905    }
906
907    /// Get the tags.
908    pub fn tags(&self) -> &[String] {
909        self.inner.tags()
910    }
911
912    /// Get a child callback manager.
913    pub fn get_child(&self, tag: Option<&str>) -> CallbackManager {
914        self.inner.get_child(tag)
915    }
916
917    /// Run when retriever ends running.
918    pub fn on_retriever_end(&self, documents: &[serde_json::Value]) {
919        if self.inner.inner.inner.handlers.is_empty() {
920            return;
921        }
922        let run_id = self.inner.run_id();
923        let parent_run_id = self.inner.parent_run_id();
924        let tags = self.inner.tags().to_vec();
925        handle_event(
926            self.inner.handlers(),
927            Some(|h: &dyn BaseCallbackHandler| h.ignore_retriever()),
928            |_handler| {
929                let _ = (documents, run_id, parent_run_id, &tags);
930            },
931        );
932    }
933
934    /// Run when retriever errors.
935    pub fn on_retriever_error(&self, error: &dyn std::error::Error) {
936        if self.inner.inner.inner.handlers.is_empty() {
937            return;
938        }
939        let run_id = self.inner.run_id();
940        let parent_run_id = self.inner.parent_run_id();
941        let tags = self.inner.tags().to_vec();
942        handle_event(
943            self.inner.handlers(),
944            Some(|h: &dyn BaseCallbackHandler| h.ignore_retriever()),
945            |_handler| {
946                let _ = (error, run_id, parent_run_id, &tags);
947            },
948        );
949    }
950
951    /// Return a noop manager.
952    pub fn get_noop_manager() -> Self {
953        Self {
954            inner: ParentRunManager::get_noop_manager(),
955        }
956    }
957}
958
959/// Callback manager for LangChain.
960#[derive(Debug, Clone, Default)]
961pub struct CallbackManager {
962    /// The handlers.
963    pub handlers: Vec<Arc<dyn BaseCallbackHandler>>,
964    /// The inheritable handlers.
965    pub inheritable_handlers: Vec<Arc<dyn BaseCallbackHandler>>,
966    /// The parent run ID.
967    pub parent_run_id: Option<Uuid>,
968    /// The tags.
969    pub tags: Vec<String>,
970    /// The inheritable tags.
971    pub inheritable_tags: Vec<String>,
972    /// The metadata.
973    pub metadata: HashMap<String, serde_json::Value>,
974    /// The inheritable metadata.
975    pub inheritable_metadata: HashMap<String, serde_json::Value>,
976}
977
978impl CallbackManager {
979    /// Create a new callback manager.
980    pub fn new() -> Self {
981        Self::default()
982    }
983
984    /// Create a callback manager from a base callback manager.
985    pub fn from_base(base: BaseCallbackManager) -> Self {
986        Self {
987            handlers: base.handlers,
988            inheritable_handlers: base.inheritable_handlers,
989            parent_run_id: base.parent_run_id,
990            tags: base.tags,
991            inheritable_tags: base.inheritable_tags,
992            metadata: base.metadata,
993            inheritable_metadata: base.inheritable_metadata,
994        }
995    }
996
997    /// Set handlers.
998    pub fn set_handlers(&mut self, handlers: Vec<Arc<dyn BaseCallbackHandler>>, inherit: bool) {
999        self.handlers = handlers.clone();
1000        if inherit {
1001            self.inheritable_handlers = handlers;
1002        }
1003    }
1004
1005    /// Add handler.
1006    pub fn add_handler(&mut self, handler: Arc<dyn BaseCallbackHandler>, inherit: bool) {
1007        if !self
1008            .handlers
1009            .iter()
1010            .any(|h| std::ptr::eq(h.as_ref(), handler.as_ref()))
1011        {
1012            self.handlers.push(handler.clone());
1013        }
1014        if inherit
1015            && !self
1016                .inheritable_handlers
1017                .iter()
1018                .any(|h| std::ptr::eq(h.as_ref(), handler.as_ref()))
1019        {
1020            self.inheritable_handlers.push(handler);
1021        }
1022    }
1023
1024    /// Add tags.
1025    pub fn add_tags(&mut self, tags: Vec<String>, inherit: bool) {
1026        for tag in &tags {
1027            if !self.tags.contains(tag) {
1028                self.tags.push(tag.clone());
1029            }
1030        }
1031        if inherit {
1032            for tag in tags {
1033                if !self.inheritable_tags.contains(&tag) {
1034                    self.inheritable_tags.push(tag);
1035                }
1036            }
1037        }
1038    }
1039
1040    /// Add metadata.
1041    pub fn add_metadata(&mut self, metadata: HashMap<String, serde_json::Value>, inherit: bool) {
1042        self.metadata.extend(metadata.clone());
1043        if inherit {
1044            self.inheritable_metadata.extend(metadata);
1045        }
1046    }
1047
1048    /// Run when LLM starts running.
1049    pub fn on_llm_start(
1050        &self,
1051        serialized: &HashMap<String, serde_json::Value>,
1052        prompts: &[String],
1053        run_id: Option<Uuid>,
1054    ) -> Vec<CallbackManagerForLLMRun> {
1055        let mut managers = Vec::new();
1056
1057        for (i, _prompt) in prompts.iter().enumerate() {
1058            let run_id = if i == 0
1059                && let Some(run_id) = run_id
1060            {
1061                run_id
1062            } else {
1063                uuid7(None)
1064            };
1065
1066            handle_event(
1067                &self.handlers,
1068                Some(|h: &dyn BaseCallbackHandler| h.ignore_llm()),
1069                |_handler| {
1070                    let _ = (serialized, run_id);
1071                },
1072            );
1073
1074            managers.push(CallbackManagerForLLMRun::new(
1075                run_id,
1076                self.handlers.clone(),
1077                self.inheritable_handlers.clone(),
1078                self.parent_run_id,
1079                Some(self.tags.clone()),
1080                Some(self.inheritable_tags.clone()),
1081                Some(self.metadata.clone()),
1082                Some(self.inheritable_metadata.clone()),
1083            ));
1084        }
1085
1086        managers
1087    }
1088
1089    /// Run when chat model starts running.
1090    pub fn on_chat_model_start(
1091        &self,
1092        serialized: &HashMap<String, serde_json::Value>,
1093        messages: &[Vec<BaseMessage>],
1094        run_id: Option<Uuid>,
1095    ) -> Vec<CallbackManagerForLLMRun> {
1096        let mut managers = Vec::new();
1097        let mut current_run_id = run_id;
1098
1099        for _message_list in messages {
1100            let run_id = current_run_id.unwrap_or_else(|| uuid7(None));
1101            current_run_id = None;
1102
1103            handle_event(
1104                &self.handlers,
1105                Some(|h: &dyn BaseCallbackHandler| h.ignore_chat_model()),
1106                |_handler| {
1107                    let _ = (serialized, run_id);
1108                },
1109            );
1110
1111            managers.push(CallbackManagerForLLMRun::new(
1112                run_id,
1113                self.handlers.clone(),
1114                self.inheritable_handlers.clone(),
1115                self.parent_run_id,
1116                Some(self.tags.clone()),
1117                Some(self.inheritable_tags.clone()),
1118                Some(self.metadata.clone()),
1119                Some(self.inheritable_metadata.clone()),
1120            ));
1121        }
1122
1123        managers
1124    }
1125
1126    /// Run when chain starts running.
1127    pub fn on_chain_start(
1128        &self,
1129        serialized: &HashMap<String, serde_json::Value>,
1130        inputs: &HashMap<String, serde_json::Value>,
1131        run_id: Option<Uuid>,
1132    ) -> CallbackManagerForChainRun {
1133        let run_id = run_id.unwrap_or_else(|| uuid7(None));
1134
1135        handle_event(
1136            &self.handlers,
1137            Some(|h: &dyn BaseCallbackHandler| h.ignore_chain()),
1138            |_handler| {
1139                let _ = (serialized, inputs, run_id);
1140            },
1141        );
1142
1143        CallbackManagerForChainRun::new(
1144            run_id,
1145            self.handlers.clone(),
1146            self.inheritable_handlers.clone(),
1147            self.parent_run_id,
1148            Some(self.tags.clone()),
1149            Some(self.inheritable_tags.clone()),
1150            Some(self.metadata.clone()),
1151            Some(self.inheritable_metadata.clone()),
1152        )
1153    }
1154
1155    /// Run when tool starts running.
1156    pub fn on_tool_start(
1157        &self,
1158        serialized: &HashMap<String, serde_json::Value>,
1159        input_str: &str,
1160        run_id: Option<Uuid>,
1161        inputs: Option<&HashMap<String, serde_json::Value>>,
1162    ) -> CallbackManagerForToolRun {
1163        let run_id = run_id.unwrap_or_else(|| uuid7(None));
1164
1165        handle_event(
1166            &self.handlers,
1167            Some(|h: &dyn BaseCallbackHandler| h.ignore_agent()),
1168            |_handler| {
1169                let _ = (serialized, input_str, run_id, inputs);
1170            },
1171        );
1172
1173        CallbackManagerForToolRun::new(
1174            run_id,
1175            self.handlers.clone(),
1176            self.inheritable_handlers.clone(),
1177            self.parent_run_id,
1178            Some(self.tags.clone()),
1179            Some(self.inheritable_tags.clone()),
1180            Some(self.metadata.clone()),
1181            Some(self.inheritable_metadata.clone()),
1182        )
1183    }
1184
1185    /// Run when retriever starts running.
1186    pub fn on_retriever_start(
1187        &self,
1188        serialized: &HashMap<String, serde_json::Value>,
1189        query: &str,
1190        run_id: Option<Uuid>,
1191    ) -> CallbackManagerForRetrieverRun {
1192        let run_id = run_id.unwrap_or_else(|| uuid7(None));
1193
1194        handle_event(
1195            &self.handlers,
1196            Some(|h: &dyn BaseCallbackHandler| h.ignore_retriever()),
1197            |_handler| {
1198                let _ = (serialized, query, run_id);
1199            },
1200        );
1201
1202        CallbackManagerForRetrieverRun::new(
1203            run_id,
1204            self.handlers.clone(),
1205            self.inheritable_handlers.clone(),
1206            self.parent_run_id,
1207            Some(self.tags.clone()),
1208            Some(self.inheritable_tags.clone()),
1209            Some(self.metadata.clone()),
1210            Some(self.inheritable_metadata.clone()),
1211        )
1212    }
1213
1214    /// Dispatch a custom event.
1215    pub fn on_custom_event(&self, name: &str, data: &serde_json::Value, run_id: Option<Uuid>) {
1216        if self.handlers.is_empty() {
1217            return;
1218        }
1219
1220        let run_id = run_id.unwrap_or_else(|| uuid7(None));
1221
1222        handle_event(
1223            &self.handlers,
1224            Some(|h: &dyn BaseCallbackHandler| h.ignore_custom_event()),
1225            |_handler| {
1226                let _ = (name, data, run_id);
1227            },
1228        );
1229    }
1230
1231    /// Configure the callback manager.
1232    pub fn configure(
1233        inheritable_callbacks: Option<Callbacks>,
1234        local_callbacks: Option<Callbacks>,
1235        inheritable_tags: Option<Vec<String>>,
1236        local_tags: Option<Vec<String>>,
1237        inheritable_metadata: Option<HashMap<String, serde_json::Value>>,
1238        local_metadata: Option<HashMap<String, serde_json::Value>>,
1239        _verbose: bool,
1240    ) -> Self {
1241        let mut callback_manager = Self::new();
1242
1243        if let Some(callbacks) = inheritable_callbacks {
1244            match callbacks {
1245                Callbacks::Handlers(handlers) => {
1246                    callback_manager.handlers = handlers.clone();
1247                    callback_manager.inheritable_handlers = handlers;
1248                }
1249                Callbacks::Manager(manager) => {
1250                    callback_manager.handlers = manager.handlers.clone();
1251                    callback_manager.inheritable_handlers = manager.inheritable_handlers.clone();
1252                    callback_manager.parent_run_id = manager.parent_run_id;
1253                    callback_manager.tags = manager.tags.clone();
1254                    callback_manager.inheritable_tags = manager.inheritable_tags.clone();
1255                    callback_manager.metadata = manager.metadata.clone();
1256                    callback_manager.inheritable_metadata = manager.inheritable_metadata.clone();
1257                }
1258            }
1259        }
1260
1261        if let Some(callbacks) = local_callbacks {
1262            match callbacks {
1263                Callbacks::Handlers(handlers) => {
1264                    for handler in handlers {
1265                        callback_manager.add_handler(handler, false);
1266                    }
1267                }
1268                Callbacks::Manager(manager) => {
1269                    for handler in manager.handlers {
1270                        callback_manager.add_handler(handler, false);
1271                    }
1272                }
1273            }
1274        }
1275
1276        if let Some(tags) = inheritable_tags {
1277            callback_manager.add_tags(tags, true);
1278        }
1279        if let Some(tags) = local_tags {
1280            callback_manager.add_tags(tags, false);
1281        }
1282        if let Some(metadata) = inheritable_metadata {
1283            callback_manager.add_metadata(metadata, true);
1284        }
1285        if let Some(metadata) = local_metadata {
1286            callback_manager.add_metadata(metadata, false);
1287        }
1288
1289        callback_manager
1290    }
1291}
1292
1293/// Async callback manager for LangChain.
1294#[derive(Debug, Clone, Default)]
1295pub struct AsyncCallbackManager {
1296    /// The inner callback manager.
1297    inner: CallbackManager,
1298}
1299
1300impl AsyncCallbackManager {
1301    /// Create a new async callback manager.
1302    pub fn new() -> Self {
1303        Self::default()
1304    }
1305
1306    /// Create from a callback manager.
1307    pub fn from_callback_manager(manager: CallbackManager) -> Self {
1308        Self { inner: manager }
1309    }
1310
1311    /// Get the handlers.
1312    pub fn handlers(&self) -> &[Arc<dyn BaseCallbackHandler>] {
1313        &self.inner.handlers
1314    }
1315
1316    /// Get the parent run ID.
1317    pub fn parent_run_id(&self) -> Option<Uuid> {
1318        self.inner.parent_run_id
1319    }
1320
1321    /// Set handlers.
1322    pub fn set_handlers(&mut self, handlers: Vec<Arc<dyn BaseCallbackHandler>>, inherit: bool) {
1323        self.inner.set_handlers(handlers, inherit);
1324    }
1325
1326    /// Add handler.
1327    pub fn add_handler(&mut self, handler: Arc<dyn BaseCallbackHandler>, inherit: bool) {
1328        self.inner.add_handler(handler, inherit);
1329    }
1330
1331    /// Add tags.
1332    pub fn add_tags(&mut self, tags: Vec<String>, inherit: bool) {
1333        self.inner.add_tags(tags, inherit);
1334    }
1335
1336    /// Add metadata.
1337    pub fn add_metadata(&mut self, metadata: HashMap<String, serde_json::Value>, inherit: bool) {
1338        self.inner.add_metadata(metadata, inherit);
1339    }
1340
1341    /// Whether this is async.
1342    pub fn is_async(&self) -> bool {
1343        true
1344    }
1345
1346    /// Run when LLM starts running (async).
1347    pub async fn on_llm_start(
1348        &self,
1349        serialized: &HashMap<String, serde_json::Value>,
1350        prompts: &[String],
1351        run_id: Option<Uuid>,
1352    ) -> Vec<AsyncCallbackManagerForLLMRun> {
1353        self.inner
1354            .on_llm_start(serialized, prompts, run_id)
1355            .into_iter()
1356            .map(AsyncCallbackManagerForLLMRun::from_sync)
1357            .collect()
1358    }
1359
1360    /// Run when chat model starts running (async).
1361    pub async fn on_chat_model_start(
1362        &self,
1363        serialized: &HashMap<String, serde_json::Value>,
1364        messages: &[Vec<BaseMessage>],
1365        run_id: Option<Uuid>,
1366    ) -> Vec<AsyncCallbackManagerForLLMRun> {
1367        self.inner
1368            .on_chat_model_start(serialized, messages, run_id)
1369            .into_iter()
1370            .map(AsyncCallbackManagerForLLMRun::from_sync)
1371            .collect()
1372    }
1373
1374    /// Run when chain starts running (async).
1375    pub async fn on_chain_start(
1376        &self,
1377        serialized: &HashMap<String, serde_json::Value>,
1378        inputs: &HashMap<String, serde_json::Value>,
1379        run_id: Option<Uuid>,
1380    ) -> AsyncCallbackManagerForChainRun {
1381        AsyncCallbackManagerForChainRun::from_sync(
1382            self.inner.on_chain_start(serialized, inputs, run_id),
1383        )
1384    }
1385
1386    /// Run when tool starts running (async).
1387    pub async fn on_tool_start(
1388        &self,
1389        serialized: &HashMap<String, serde_json::Value>,
1390        input_str: &str,
1391        run_id: Option<Uuid>,
1392        inputs: Option<&HashMap<String, serde_json::Value>>,
1393    ) -> AsyncCallbackManagerForToolRun {
1394        AsyncCallbackManagerForToolRun::from_sync(
1395            self.inner
1396                .on_tool_start(serialized, input_str, run_id, inputs),
1397        )
1398    }
1399
1400    /// Run when retriever starts running (async).
1401    pub async fn on_retriever_start(
1402        &self,
1403        serialized: &HashMap<String, serde_json::Value>,
1404        query: &str,
1405        run_id: Option<Uuid>,
1406    ) -> AsyncCallbackManagerForRetrieverRun {
1407        AsyncCallbackManagerForRetrieverRun::from_sync(
1408            self.inner.on_retriever_start(serialized, query, run_id),
1409        )
1410    }
1411
1412    /// Dispatch a custom event (async).
1413    pub async fn on_custom_event(
1414        &self,
1415        name: &str,
1416        data: &serde_json::Value,
1417        run_id: Option<Uuid>,
1418    ) {
1419        if self.inner.handlers.is_empty() {
1420            return;
1421        }
1422
1423        let run_id = run_id.unwrap_or_else(|| uuid7(None));
1424
1425        handle_event(
1426            &self.inner.handlers,
1427            Some(|h: &dyn BaseCallbackHandler| h.ignore_custom_event()),
1428            |_handler| {
1429                let _ = (name, data, run_id);
1430            },
1431        );
1432    }
1433
1434    /// Configure the async callback manager.
1435    pub fn configure(
1436        inheritable_callbacks: Option<Callbacks>,
1437        local_callbacks: Option<Callbacks>,
1438        inheritable_tags: Option<Vec<String>>,
1439        local_tags: Option<Vec<String>>,
1440        inheritable_metadata: Option<HashMap<String, serde_json::Value>>,
1441        local_metadata: Option<HashMap<String, serde_json::Value>>,
1442        verbose: bool,
1443    ) -> Self {
1444        Self {
1445            inner: CallbackManager::configure(
1446                inheritable_callbacks,
1447                local_callbacks,
1448                inheritable_tags,
1449                local_tags,
1450                inheritable_metadata,
1451                local_metadata,
1452                verbose,
1453            ),
1454        }
1455    }
1456}
1457
1458/// Async callback manager for LLM run.
1459#[derive(Debug, Clone)]
1460pub struct AsyncCallbackManagerForLLMRun {
1461    /// The inner sync callback manager.
1462    inner: CallbackManagerForLLMRun,
1463}
1464
1465impl AsyncCallbackManagerForLLMRun {
1466    /// Create from sync callback manager.
1467    pub fn from_sync(inner: CallbackManagerForLLMRun) -> Self {
1468        Self { inner }
1469    }
1470
1471    /// Get the sync version.
1472    pub fn get_sync(&self) -> CallbackManagerForLLMRun {
1473        self.inner.clone()
1474    }
1475
1476    /// Get the run ID.
1477    pub fn run_id(&self) -> Uuid {
1478        self.inner.run_id()
1479    }
1480
1481    /// Get the parent run ID.
1482    pub fn parent_run_id(&self) -> Option<Uuid> {
1483        self.inner.parent_run_id()
1484    }
1485
1486    /// Get the handlers.
1487    pub fn handlers(&self) -> &[Arc<dyn BaseCallbackHandler>] {
1488        self.inner.handlers()
1489    }
1490
1491    /// Run when LLM generates a new token (async).
1492    pub async fn on_llm_new_token(&self, token: &str, chunk: Option<&serde_json::Value>) {
1493        self.inner.on_llm_new_token(token, chunk);
1494    }
1495
1496    /// Run when LLM ends running (async).
1497    pub async fn on_llm_end(&self, response: &ChatResult) {
1498        self.inner.on_llm_end(response);
1499    }
1500
1501    /// Run when LLM errors (async).
1502    pub async fn on_llm_error(&self, error: &dyn std::error::Error) {
1503        self.inner.on_llm_error(error);
1504    }
1505
1506    /// Return a noop manager.
1507    pub fn get_noop_manager() -> Self {
1508        Self {
1509            inner: CallbackManagerForLLMRun::get_noop_manager(),
1510        }
1511    }
1512}
1513
1514/// Async callback manager for chain run.
1515#[derive(Debug, Clone)]
1516pub struct AsyncCallbackManagerForChainRun {
1517    /// The inner sync callback manager.
1518    inner: CallbackManagerForChainRun,
1519}
1520
1521impl AsyncCallbackManagerForChainRun {
1522    /// Create from sync callback manager.
1523    pub fn from_sync(inner: CallbackManagerForChainRun) -> Self {
1524        Self { inner }
1525    }
1526
1527    /// Get the sync version.
1528    pub fn get_sync(&self) -> CallbackManagerForChainRun {
1529        self.inner.clone()
1530    }
1531
1532    /// Get the run ID.
1533    pub fn run_id(&self) -> Uuid {
1534        self.inner.run_id()
1535    }
1536
1537    /// Get the parent run ID.
1538    pub fn parent_run_id(&self) -> Option<Uuid> {
1539        self.inner.parent_run_id()
1540    }
1541
1542    /// Get the handlers.
1543    pub fn handlers(&self) -> &[Arc<dyn BaseCallbackHandler>] {
1544        self.inner.handlers()
1545    }
1546
1547    /// Get a child callback manager.
1548    pub fn get_child(&self, tag: Option<&str>) -> AsyncCallbackManager {
1549        AsyncCallbackManager::from_callback_manager(self.inner.get_child(tag))
1550    }
1551
1552    /// Run when chain ends running (async).
1553    pub async fn on_chain_end(&self, outputs: &HashMap<String, serde_json::Value>) {
1554        self.inner.on_chain_end(outputs);
1555    }
1556
1557    /// Run when chain errors (async).
1558    pub async fn on_chain_error(&self, error: &dyn std::error::Error) {
1559        self.inner.on_chain_error(error);
1560    }
1561
1562    /// Run when agent action is received (async).
1563    pub async fn on_agent_action(&self, action: &serde_json::Value) {
1564        self.inner.on_agent_action(action);
1565    }
1566
1567    /// Run when agent finish is received (async).
1568    pub async fn on_agent_finish(&self, finish: &serde_json::Value) {
1569        self.inner.on_agent_finish(finish);
1570    }
1571
1572    /// Return a noop manager.
1573    pub fn get_noop_manager() -> Self {
1574        Self {
1575            inner: CallbackManagerForChainRun::get_noop_manager(),
1576        }
1577    }
1578}
1579
1580/// Async callback manager for tool run.
1581#[derive(Debug, Clone)]
1582pub struct AsyncCallbackManagerForToolRun {
1583    /// The inner sync callback manager.
1584    inner: CallbackManagerForToolRun,
1585}
1586
1587impl AsyncCallbackManagerForToolRun {
1588    /// Create from sync callback manager.
1589    pub fn from_sync(inner: CallbackManagerForToolRun) -> Self {
1590        Self { inner }
1591    }
1592
1593    /// Get the sync version.
1594    pub fn get_sync(&self) -> CallbackManagerForToolRun {
1595        self.inner.clone()
1596    }
1597
1598    /// Get the run ID.
1599    pub fn run_id(&self) -> Uuid {
1600        self.inner.run_id()
1601    }
1602
1603    /// Get the parent run ID.
1604    pub fn parent_run_id(&self) -> Option<Uuid> {
1605        self.inner.parent_run_id()
1606    }
1607
1608    /// Get the handlers.
1609    pub fn handlers(&self) -> &[Arc<dyn BaseCallbackHandler>] {
1610        self.inner.handlers()
1611    }
1612
1613    /// Get a child callback manager.
1614    pub fn get_child(&self, tag: Option<&str>) -> AsyncCallbackManager {
1615        AsyncCallbackManager::from_callback_manager(self.inner.get_child(tag))
1616    }
1617
1618    /// Run when tool ends running (async).
1619    pub async fn on_tool_end(&self, output: &str) {
1620        self.inner.on_tool_end(output);
1621    }
1622
1623    /// Run when tool errors (async).
1624    pub async fn on_tool_error(&self, error: &dyn std::error::Error) {
1625        self.inner.on_tool_error(error);
1626    }
1627
1628    /// Return a noop manager.
1629    pub fn get_noop_manager() -> Self {
1630        Self {
1631            inner: CallbackManagerForToolRun::get_noop_manager(),
1632        }
1633    }
1634}
1635
1636/// Async callback manager for retriever run.
1637#[derive(Debug, Clone)]
1638pub struct AsyncCallbackManagerForRetrieverRun {
1639    /// The inner sync callback manager.
1640    inner: CallbackManagerForRetrieverRun,
1641}
1642
1643impl AsyncCallbackManagerForRetrieverRun {
1644    /// Create from sync callback manager.
1645    pub fn from_sync(inner: CallbackManagerForRetrieverRun) -> Self {
1646        Self { inner }
1647    }
1648
1649    /// Get the sync version.
1650    pub fn get_sync(&self) -> CallbackManagerForRetrieverRun {
1651        self.inner.clone()
1652    }
1653
1654    /// Get the run ID.
1655    pub fn run_id(&self) -> Uuid {
1656        self.inner.run_id()
1657    }
1658
1659    /// Get the parent run ID.
1660    pub fn parent_run_id(&self) -> Option<Uuid> {
1661        self.inner.parent_run_id()
1662    }
1663
1664    /// Get the handlers.
1665    pub fn handlers(&self) -> &[Arc<dyn BaseCallbackHandler>] {
1666        self.inner.handlers()
1667    }
1668
1669    /// Get a child callback manager.
1670    pub fn get_child(&self, tag: Option<&str>) -> AsyncCallbackManager {
1671        AsyncCallbackManager::from_callback_manager(self.inner.get_child(tag))
1672    }
1673
1674    /// Run when retriever ends running (async).
1675    pub async fn on_retriever_end(&self, documents: &[serde_json::Value]) {
1676        self.inner.on_retriever_end(documents);
1677    }
1678
1679    /// Run when retriever errors (async).
1680    pub async fn on_retriever_error(&self, error: &dyn std::error::Error) {
1681        self.inner.on_retriever_error(error);
1682    }
1683
1684    /// Return a noop manager.
1685    pub fn get_noop_manager() -> Self {
1686        Self {
1687            inner: CallbackManagerForRetrieverRun::get_noop_manager(),
1688        }
1689    }
1690}
1691
1692#[cfg(test)]
1693mod tests {
1694    use super::*;
1695
1696    #[test]
1697    fn test_callback_manager_on_chain_start() {
1698        let manager = CallbackManager::new();
1699        let run_manager = manager.on_chain_start(&HashMap::new(), &HashMap::new(), None);
1700
1701        assert!(!run_manager.run_id().is_nil());
1702    }
1703
1704    #[test]
1705    fn test_callback_manager_configure() {
1706        let manager = CallbackManager::configure(
1707            None,
1708            None,
1709            Some(vec!["tag1".to_string()]),
1710            Some(vec!["tag2".to_string()]),
1711            None,
1712            None,
1713            false,
1714        );
1715
1716        assert!(manager.tags.contains(&"tag1".to_string()));
1717        assert!(manager.tags.contains(&"tag2".to_string()));
1718        assert!(manager.inheritable_tags.contains(&"tag1".to_string()));
1719        assert!(!manager.inheritable_tags.contains(&"tag2".to_string()));
1720    }
1721}
1722
1723/// Callback manager for chain group.
1724///
1725/// This manager is used for grouping different calls together as a single run
1726/// even if they aren't composed in a single chain.
1727#[derive(Debug, Clone)]
1728pub struct CallbackManagerForChainGroup {
1729    /// The inner callback manager.
1730    inner: CallbackManager,
1731    /// The parent run manager.
1732    parent_run_manager: CallbackManagerForChainRun,
1733    /// Whether the chain group has ended.
1734    pub ended: bool,
1735}
1736
1737impl CallbackManagerForChainGroup {
1738    /// Create a new callback manager for chain group.
1739    #[allow(clippy::too_many_arguments)]
1740    pub fn new(
1741        handlers: Vec<Arc<dyn BaseCallbackHandler>>,
1742        inheritable_handlers: Option<Vec<Arc<dyn BaseCallbackHandler>>>,
1743        parent_run_id: Option<Uuid>,
1744        parent_run_manager: CallbackManagerForChainRun,
1745        tags: Option<Vec<String>>,
1746        inheritable_tags: Option<Vec<String>>,
1747        metadata: Option<HashMap<String, serde_json::Value>>,
1748        inheritable_metadata: Option<HashMap<String, serde_json::Value>>,
1749    ) -> Self {
1750        let mut inner = CallbackManager::new();
1751        inner.handlers = handlers;
1752        inner.inheritable_handlers = inheritable_handlers.unwrap_or_default();
1753        inner.parent_run_id = parent_run_id;
1754        inner.tags = tags.unwrap_or_default();
1755        inner.inheritable_tags = inheritable_tags.unwrap_or_default();
1756        inner.metadata = metadata.unwrap_or_default();
1757        inner.inheritable_metadata = inheritable_metadata.unwrap_or_default();
1758
1759        Self {
1760            inner,
1761            parent_run_manager,
1762            ended: false,
1763        }
1764    }
1765
1766    /// Get the handlers.
1767    pub fn handlers(&self) -> &[Arc<dyn BaseCallbackHandler>] {
1768        &self.inner.handlers
1769    }
1770
1771    /// Get the parent run ID.
1772    pub fn parent_run_id(&self) -> Option<Uuid> {
1773        self.inner.parent_run_id
1774    }
1775
1776    /// Get the tags.
1777    pub fn tags(&self) -> &[String] {
1778        &self.inner.tags
1779    }
1780
1781    /// Copy the callback manager.
1782    pub fn copy(&self) -> Self {
1783        Self {
1784            inner: self.inner.clone(),
1785            parent_run_manager: self.parent_run_manager.clone(),
1786            ended: self.ended,
1787        }
1788    }
1789
1790    /// Merge with another callback manager.
1791    pub fn merge(&self, other: &CallbackManager) -> Self {
1792        let mut merged_inner = self.inner.clone();
1793
1794        // Merge tags (deduplicated)
1795        for tag in &other.tags {
1796            if !merged_inner.tags.contains(tag) {
1797                merged_inner.tags.push(tag.clone());
1798            }
1799        }
1800        for tag in &other.inheritable_tags {
1801            if !merged_inner.inheritable_tags.contains(tag) {
1802                merged_inner.inheritable_tags.push(tag.clone());
1803            }
1804        }
1805
1806        // Merge metadata
1807        merged_inner.metadata.extend(other.metadata.clone());
1808
1809        // Merge handlers
1810        for handler in &other.handlers {
1811            merged_inner.add_handler(handler.clone(), false);
1812        }
1813
1814        Self {
1815            inner: merged_inner,
1816            parent_run_manager: self.parent_run_manager.clone(),
1817            ended: self.ended,
1818        }
1819    }
1820
1821    /// Set handlers.
1822    pub fn set_handlers(&mut self, handlers: Vec<Arc<dyn BaseCallbackHandler>>, inherit: bool) {
1823        self.inner.set_handlers(handlers, inherit);
1824    }
1825
1826    /// Add handler.
1827    pub fn add_handler(&mut self, handler: Arc<dyn BaseCallbackHandler>, inherit: bool) {
1828        self.inner.add_handler(handler, inherit);
1829    }
1830
1831    /// Add tags.
1832    pub fn add_tags(&mut self, tags: Vec<String>, inherit: bool) {
1833        self.inner.add_tags(tags, inherit);
1834    }
1835
1836    /// Add metadata.
1837    pub fn add_metadata(&mut self, metadata: HashMap<String, serde_json::Value>, inherit: bool) {
1838        self.inner.add_metadata(metadata, inherit);
1839    }
1840
1841    /// Run when chain ends running.
1842    pub fn on_chain_end(&mut self, outputs: &HashMap<String, serde_json::Value>) {
1843        self.ended = true;
1844        self.parent_run_manager.on_chain_end(outputs);
1845    }
1846
1847    /// Run when chain errors.
1848    pub fn on_chain_error(&mut self, error: &dyn std::error::Error) {
1849        self.ended = true;
1850        self.parent_run_manager.on_chain_error(error);
1851    }
1852
1853    /// Run when LLM starts running.
1854    pub fn on_llm_start(
1855        &self,
1856        serialized: &HashMap<String, serde_json::Value>,
1857        prompts: &[String],
1858        run_id: Option<Uuid>,
1859    ) -> Vec<CallbackManagerForLLMRun> {
1860        self.inner.on_llm_start(serialized, prompts, run_id)
1861    }
1862
1863    /// Run when chat model starts running.
1864    pub fn on_chat_model_start(
1865        &self,
1866        serialized: &HashMap<String, serde_json::Value>,
1867        messages: &[Vec<BaseMessage>],
1868        run_id: Option<Uuid>,
1869    ) -> Vec<CallbackManagerForLLMRun> {
1870        self.inner.on_chat_model_start(serialized, messages, run_id)
1871    }
1872
1873    /// Run when chain starts running.
1874    pub fn on_chain_start(
1875        &self,
1876        serialized: &HashMap<String, serde_json::Value>,
1877        inputs: &HashMap<String, serde_json::Value>,
1878        run_id: Option<Uuid>,
1879    ) -> CallbackManagerForChainRun {
1880        self.inner.on_chain_start(serialized, inputs, run_id)
1881    }
1882
1883    /// Run when tool starts running.
1884    pub fn on_tool_start(
1885        &self,
1886        serialized: &HashMap<String, serde_json::Value>,
1887        input_str: &str,
1888        run_id: Option<Uuid>,
1889        inputs: Option<&HashMap<String, serde_json::Value>>,
1890    ) -> CallbackManagerForToolRun {
1891        self.inner
1892            .on_tool_start(serialized, input_str, run_id, inputs)
1893    }
1894
1895    /// Run when retriever starts running.
1896    pub fn on_retriever_start(
1897        &self,
1898        serialized: &HashMap<String, serde_json::Value>,
1899        query: &str,
1900        run_id: Option<Uuid>,
1901    ) -> CallbackManagerForRetrieverRun {
1902        self.inner.on_retriever_start(serialized, query, run_id)
1903    }
1904}
1905
1906/// Async callback manager for chain group.
1907#[derive(Debug, Clone)]
1908pub struct AsyncCallbackManagerForChainGroup {
1909    /// The inner callback manager.
1910    inner: AsyncCallbackManager,
1911    /// The parent run manager.
1912    parent_run_manager: AsyncCallbackManagerForChainRun,
1913    /// Whether the chain group has ended.
1914    pub ended: bool,
1915}
1916
1917impl AsyncCallbackManagerForChainGroup {
1918    /// Create a new async callback manager for chain group.
1919    #[allow(clippy::too_many_arguments)]
1920    pub fn new(
1921        handlers: Vec<Arc<dyn BaseCallbackHandler>>,
1922        inheritable_handlers: Option<Vec<Arc<dyn BaseCallbackHandler>>>,
1923        parent_run_id: Option<Uuid>,
1924        parent_run_manager: AsyncCallbackManagerForChainRun,
1925        tags: Option<Vec<String>>,
1926        inheritable_tags: Option<Vec<String>>,
1927        metadata: Option<HashMap<String, serde_json::Value>>,
1928        inheritable_metadata: Option<HashMap<String, serde_json::Value>>,
1929    ) -> Self {
1930        let mut inner_sync = CallbackManager::new();
1931        inner_sync.handlers = handlers;
1932        inner_sync.inheritable_handlers = inheritable_handlers.unwrap_or_default();
1933        inner_sync.parent_run_id = parent_run_id;
1934        inner_sync.tags = tags.unwrap_or_default();
1935        inner_sync.inheritable_tags = inheritable_tags.unwrap_or_default();
1936        inner_sync.metadata = metadata.unwrap_or_default();
1937        inner_sync.inheritable_metadata = inheritable_metadata.unwrap_or_default();
1938
1939        Self {
1940            inner: AsyncCallbackManager::from_callback_manager(inner_sync),
1941            parent_run_manager,
1942            ended: false,
1943        }
1944    }
1945
1946    /// Get the handlers.
1947    pub fn handlers(&self) -> &[Arc<dyn BaseCallbackHandler>] {
1948        self.inner.handlers()
1949    }
1950
1951    /// Get the parent run ID.
1952    pub fn parent_run_id(&self) -> Option<Uuid> {
1953        self.inner.parent_run_id()
1954    }
1955
1956    /// Copy the callback manager.
1957    pub fn copy(&self) -> Self {
1958        Self {
1959            inner: self.inner.clone(),
1960            parent_run_manager: self.parent_run_manager.clone(),
1961            ended: self.ended,
1962        }
1963    }
1964
1965    /// Merge with another callback manager.
1966    pub fn merge(&self, other: &CallbackManager) -> Self {
1967        let mut inner_sync = self.inner.inner.clone();
1968
1969        // Merge tags (deduplicated)
1970        for tag in &other.tags {
1971            if !inner_sync.tags.contains(tag) {
1972                inner_sync.tags.push(tag.clone());
1973            }
1974        }
1975        for tag in &other.inheritable_tags {
1976            if !inner_sync.inheritable_tags.contains(tag) {
1977                inner_sync.inheritable_tags.push(tag.clone());
1978            }
1979        }
1980
1981        // Merge metadata
1982        inner_sync.metadata.extend(other.metadata.clone());
1983
1984        // Merge handlers
1985        for handler in &other.handlers {
1986            inner_sync.add_handler(handler.clone(), false);
1987        }
1988
1989        Self {
1990            inner: AsyncCallbackManager::from_callback_manager(inner_sync),
1991            parent_run_manager: self.parent_run_manager.clone(),
1992            ended: self.ended,
1993        }
1994    }
1995
1996    /// Set handlers.
1997    pub fn set_handlers(&mut self, handlers: Vec<Arc<dyn BaseCallbackHandler>>, inherit: bool) {
1998        self.inner.set_handlers(handlers, inherit);
1999    }
2000
2001    /// Add handler.
2002    pub fn add_handler(&mut self, handler: Arc<dyn BaseCallbackHandler>, inherit: bool) {
2003        self.inner.add_handler(handler, inherit);
2004    }
2005
2006    /// Add tags.
2007    pub fn add_tags(&mut self, tags: Vec<String>, inherit: bool) {
2008        self.inner.add_tags(tags, inherit);
2009    }
2010
2011    /// Add metadata.
2012    pub fn add_metadata(&mut self, metadata: HashMap<String, serde_json::Value>, inherit: bool) {
2013        self.inner.add_metadata(metadata, inherit);
2014    }
2015
2016    /// Run when chain ends running (async).
2017    pub async fn on_chain_end(&mut self, outputs: &HashMap<String, serde_json::Value>) {
2018        self.ended = true;
2019        self.parent_run_manager.on_chain_end(outputs).await;
2020    }
2021
2022    /// Run when chain errors (async).
2023    pub async fn on_chain_error(&mut self, error: &dyn std::error::Error) {
2024        self.ended = true;
2025        self.parent_run_manager.on_chain_error(error).await;
2026    }
2027
2028    /// Run when LLM starts running (async).
2029    pub async fn on_llm_start(
2030        &self,
2031        serialized: &HashMap<String, serde_json::Value>,
2032        prompts: &[String],
2033        run_id: Option<Uuid>,
2034    ) -> Vec<AsyncCallbackManagerForLLMRun> {
2035        self.inner.on_llm_start(serialized, prompts, run_id).await
2036    }
2037
2038    /// Run when chat model starts running (async).
2039    pub async fn on_chat_model_start(
2040        &self,
2041        serialized: &HashMap<String, serde_json::Value>,
2042        messages: &[Vec<BaseMessage>],
2043        run_id: Option<Uuid>,
2044    ) -> Vec<AsyncCallbackManagerForLLMRun> {
2045        self.inner
2046            .on_chat_model_start(serialized, messages, run_id)
2047            .await
2048    }
2049
2050    /// Run when chain starts running (async).
2051    pub async fn on_chain_start(
2052        &self,
2053        serialized: &HashMap<String, serde_json::Value>,
2054        inputs: &HashMap<String, serde_json::Value>,
2055        run_id: Option<Uuid>,
2056    ) -> AsyncCallbackManagerForChainRun {
2057        self.inner.on_chain_start(serialized, inputs, run_id).await
2058    }
2059
2060    /// Run when tool starts running (async).
2061    pub async fn on_tool_start(
2062        &self,
2063        serialized: &HashMap<String, serde_json::Value>,
2064        input_str: &str,
2065        run_id: Option<Uuid>,
2066        inputs: Option<&HashMap<String, serde_json::Value>>,
2067    ) -> AsyncCallbackManagerForToolRun {
2068        self.inner
2069            .on_tool_start(serialized, input_str, run_id, inputs)
2070            .await
2071    }
2072
2073    /// Run when retriever starts running (async).
2074    pub async fn on_retriever_start(
2075        &self,
2076        serialized: &HashMap<String, serde_json::Value>,
2077        query: &str,
2078        run_id: Option<Uuid>,
2079    ) -> AsyncCallbackManagerForRetrieverRun {
2080        self.inner
2081            .on_retriever_start(serialized, query, run_id)
2082            .await
2083    }
2084}
2085
2086/// Get a callback manager for a chain group.
2087///
2088/// Useful for grouping different calls together as a single run even if
2089/// they aren't composed in a single chain.
2090pub fn trace_as_chain_group<F, R>(
2091    group_name: &str,
2092    callback_manager: Option<CallbackManager>,
2093    inputs: Option<HashMap<String, serde_json::Value>>,
2094    tags: Option<Vec<String>>,
2095    metadata: Option<HashMap<String, serde_json::Value>>,
2096    run_id: Option<Uuid>,
2097    f: F,
2098) -> R
2099where
2100    F: FnOnce(&mut CallbackManagerForChainGroup) -> R,
2101{
2102    let cm = callback_manager.unwrap_or_else(|| {
2103        CallbackManager::configure(
2104            None,
2105            None,
2106            tags.clone(),
2107            None,
2108            metadata.clone(),
2109            None,
2110            false,
2111        )
2112    });
2113
2114    let mut serialized = HashMap::new();
2115    serialized.insert(
2116        "name".to_string(),
2117        serde_json::Value::String(group_name.to_string()),
2118    );
2119
2120    let run_manager = cm.on_chain_start(&serialized, &inputs.clone().unwrap_or_default(), run_id);
2121    let child_cm = run_manager.get_child(None);
2122
2123    let mut group_cm = CallbackManagerForChainGroup::new(
2124        child_cm.handlers.clone(),
2125        Some(child_cm.inheritable_handlers.clone()),
2126        child_cm.parent_run_id,
2127        run_manager.clone(),
2128        Some(child_cm.tags.clone()),
2129        Some(child_cm.inheritable_tags.clone()),
2130        Some(child_cm.metadata.clone()),
2131        Some(child_cm.inheritable_metadata.clone()),
2132    );
2133
2134    let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| f(&mut group_cm)));
2135
2136    match result {
2137        Ok(r) => {
2138            if !group_cm.ended {
2139                run_manager.on_chain_end(&HashMap::new());
2140            }
2141            r
2142        }
2143        Err(e) => {
2144            if !group_cm.ended {
2145                run_manager.on_chain_error(&ChainGroupPanicError);
2146            }
2147            std::panic::resume_unwind(e)
2148        }
2149    }
2150}
2151
2152/// Error type for chain group panic.
2153#[derive(Debug)]
2154struct ChainGroupPanicError;
2155
2156impl std::fmt::Display for ChainGroupPanicError {
2157    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2158        write!(f, "Chain group panicked")
2159    }
2160}
2161
2162impl std::error::Error for ChainGroupPanicError {}
2163
2164/// Dispatch an adhoc event to the handlers (sync version).
2165///
2166/// This event should NOT be used in any internal LangChain code. The event
2167/// is meant specifically for users of the library to dispatch custom
2168/// events that are tailored to their application.
2169pub fn dispatch_custom_event(
2170    name: &str,
2171    data: &serde_json::Value,
2172    callback_manager: &CallbackManager,
2173) -> Result<(), &'static str> {
2174    if callback_manager.handlers.is_empty() {
2175        return Ok(());
2176    }
2177
2178    let parent_run_id = callback_manager
2179        .parent_run_id
2180        .ok_or("Unable to dispatch an adhoc event without a parent run id.")?;
2181
2182    let run_id = parent_run_id;
2183
2184    handle_event(
2185        &callback_manager.handlers,
2186        Some(|h: &dyn BaseCallbackHandler| h.ignore_custom_event()),
2187        |_handler| {
2188            let _ = (name, data, run_id);
2189        },
2190    );
2191
2192    Ok(())
2193}
2194
2195/// Get an async callback manager for a chain group in an async context.
2196///
2197/// Useful for grouping different async calls together as a single run even if
2198/// they aren't composed in a single chain.
2199///
2200/// # Arguments
2201///
2202/// * `group_name` - The name of the chain group.
2203/// * `callback_manager` - Optional async callback manager to use.
2204/// * `inputs` - Optional inputs to the chain group.
2205/// * `tags` - Optional inheritable tags to apply to all runs.
2206/// * `metadata` - Optional metadata to apply to all runs.
2207/// * `run_id` - Optional run ID.
2208/// * `f` - The async function to execute with the chain group manager.
2209///
2210/// # Returns
2211///
2212/// The result of the async function.
2213pub async fn atrace_as_chain_group<F, Fut, R>(
2214    group_name: &str,
2215    callback_manager: Option<AsyncCallbackManager>,
2216    inputs: Option<HashMap<String, serde_json::Value>>,
2217    tags: Option<Vec<String>>,
2218    metadata: Option<HashMap<String, serde_json::Value>>,
2219    run_id: Option<Uuid>,
2220    f: F,
2221) -> R
2222where
2223    F: FnOnce(AsyncCallbackManagerForChainGroup) -> Fut,
2224    Fut: Future<Output = R>,
2225{
2226    let cm = callback_manager.unwrap_or_else(|| {
2227        AsyncCallbackManager::configure(
2228            None,
2229            None,
2230            tags.clone(),
2231            None,
2232            metadata.clone(),
2233            None,
2234            false,
2235        )
2236    });
2237
2238    let mut serialized = HashMap::new();
2239    serialized.insert(
2240        "name".to_string(),
2241        serde_json::Value::String(group_name.to_string()),
2242    );
2243
2244    let run_manager = cm
2245        .on_chain_start(&serialized, &inputs.clone().unwrap_or_default(), run_id)
2246        .await;
2247    let child_cm = run_manager.get_child(None);
2248
2249    let group_cm = AsyncCallbackManagerForChainGroup::new(
2250        child_cm.handlers().to_vec(),
2251        Some(child_cm.inner.inheritable_handlers.clone()),
2252        child_cm.parent_run_id(),
2253        run_manager.clone(),
2254        Some(child_cm.inner.tags.clone()),
2255        Some(child_cm.inner.inheritable_tags.clone()),
2256        Some(child_cm.inner.metadata.clone()),
2257        Some(child_cm.inner.inheritable_metadata.clone()),
2258    );
2259
2260    let result = f(group_cm.clone()).await;
2261
2262    if !group_cm.ended {
2263        run_manager.on_chain_end(&HashMap::new()).await;
2264    }
2265
2266    result
2267}
2268
2269/// Dispatch an adhoc event to the handlers (async version).
2270///
2271/// This event should NOT be used in any internal LangChain code. The event
2272/// is meant specifically for users of the library to dispatch custom
2273/// events that are tailored to their application.
2274pub async fn adispatch_custom_event(
2275    name: &str,
2276    data: &serde_json::Value,
2277    callback_manager: &AsyncCallbackManager,
2278) -> Result<(), &'static str> {
2279    if callback_manager.handlers().is_empty() {
2280        return Ok(());
2281    }
2282
2283    let parent_run_id = callback_manager
2284        .parent_run_id()
2285        .ok_or("Unable to dispatch an adhoc event without a parent run id.")?;
2286
2287    let run_id = parent_run_id;
2288
2289    handle_event(
2290        callback_manager.handlers(),
2291        Some(|h: &dyn BaseCallbackHandler| h.ignore_custom_event()),
2292        |_handler| {
2293            let _ = (name, data, run_id);
2294        },
2295    );
2296
2297    Ok(())
2298}