Skip to main content

nemo_flow/api/
registry.rs

1// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Middleware registry helpers for global and scope-local guardrails,
5//! intercepts, and subscribers.
6
7use crate::api::runtime::{
8    LlmConditionalFn, LlmExecutionFn, LlmRequestInterceptFn, LlmSanitizeRequestFn,
9    LlmSanitizeResponseFn, LlmStreamExecutionFn, ToolConditionalFn, ToolExecutionFn,
10    ToolInterceptFn, ToolSanitizeFn,
11};
12use crate::api::runtime::{current_scope_stack, global_context};
13use crate::api::shared::ensure_runtime_owner;
14use crate::error::{FlowError, Result};
15
16/// A priority-ordered request intercept registration entry.
17pub struct Intercept<F> {
18    /// Lower values run earlier in the chain.
19    pub priority: i32,
20    /// Whether this intercept stops later request intercepts after it returns.
21    pub break_chain: bool,
22    /// The caller-provided intercept callback.
23    pub callable: F,
24}
25
26/// A priority-ordered execution intercept registration entry.
27pub struct ExecutionIntercept<F> {
28    /// Lower values run earlier in the chain.
29    pub priority: i32,
30    /// The caller-provided execution intercept callback.
31    pub callable: F,
32}
33
34/// A priority-ordered guardrail registration entry.
35pub struct GuardrailEntry<F> {
36    /// Lower values run earlier in the chain.
37    pub priority: i32,
38    /// The caller-provided guardrail callback.
39    pub guardrail: F,
40}
41
42macro_rules! global_guardrail_registry_api {
43    (
44        $(#[$register_meta:meta])*
45        $register_name:ident,
46        $(#[$deregister_meta:meta])*
47        $deregister_name:ident,
48        $field:ident,
49        $fn_type:ty
50    ) => {
51        $(#[$register_meta])*
52        ///
53        /// # Parameters
54        /// - `name`: Unique middleware name in the global registry.
55        /// - `priority`: Lower values run earlier in the chain.
56        /// - `guardrail`: Guardrail callback stored under `name`.
57        ///
58        /// # Returns
59        /// A [`Result`] that is `Ok(())` when the guardrail was registered.
60        ///
61        /// # Errors
62        /// Returns [`FlowError::AlreadyExists`] when the name is already in
63        /// use or an internal error if the runtime state cannot be updated.
64        pub fn $register_name(name: &str, priority: i32, guardrail: $fn_type) -> Result<()> {
65            ensure_runtime_owner()?;
66            let context = global_context();
67            let mut state = context
68                .write()
69                .map_err(|error| FlowError::Internal(error.to_string()))?;
70            state
71                .$field
72                .register(name.to_string(), GuardrailEntry { priority, guardrail })
73                .map_err(FlowError::AlreadyExists)
74        }
75
76        $(#[$deregister_meta])*
77        ///
78        /// # Parameters
79        /// - `name`: Global middleware name to remove.
80        ///
81        /// # Returns
82        /// A [`Result`] containing `true` when a guardrail was removed and
83        /// `false` when the name was not registered.
84        ///
85        /// # Errors
86        /// Returns an internal error if the runtime state cannot be updated.
87        pub fn $deregister_name(name: &str) -> Result<bool> {
88            ensure_runtime_owner()?;
89            let context = global_context();
90            let mut state = context
91                .write()
92                .map_err(|error| FlowError::Internal(error.to_string()))?;
93            Ok(state.$field.deregister(name))
94        }
95    };
96}
97
98macro_rules! global_intercept_registry_api {
99    (
100        $(#[$register_meta:meta])*
101        $register_name:ident,
102        $(#[$deregister_meta:meta])*
103        $deregister_name:ident,
104        $field:ident,
105        $fn_type:ty
106    ) => {
107        $(#[$register_meta])*
108        ///
109        /// # Parameters
110        /// - `name`: Unique middleware name in the global registry.
111        /// - `priority`: Lower values run earlier in the chain.
112        /// - `break_chain`: Whether the intercept should stop later request
113        ///   intercepts after it returns.
114        /// - `callable`: Intercept callback stored under `name`.
115        ///
116        /// # Returns
117        /// A [`Result`] that is `Ok(())` when the intercept was registered.
118        ///
119        /// # Errors
120        /// Returns [`FlowError::AlreadyExists`] when the name is already in
121        /// use or an internal error if the runtime state cannot be updated.
122        pub fn $register_name(
123            name: &str,
124            priority: i32,
125            break_chain: bool,
126            callable: $fn_type,
127        ) -> Result<()> {
128            ensure_runtime_owner()?;
129            let context = global_context();
130            let mut state = context
131                .write()
132                .map_err(|error| FlowError::Internal(error.to_string()))?;
133            state
134                .$field
135                .register(
136                    name.to_string(),
137                    Intercept {
138                        priority,
139                        break_chain,
140                        callable,
141                    },
142                )
143                .map_err(FlowError::AlreadyExists)
144        }
145
146        $(#[$deregister_meta])*
147        ///
148        /// # Parameters
149        /// - `name`: Global middleware name to remove.
150        ///
151        /// # Returns
152        /// A [`Result`] containing `true` when an intercept was removed and
153        /// `false` when the name was not registered.
154        ///
155        /// # Errors
156        /// Returns an internal error if the runtime state cannot be updated.
157        pub fn $deregister_name(name: &str) -> Result<bool> {
158            ensure_runtime_owner()?;
159            let context = global_context();
160            let mut state = context
161                .write()
162                .map_err(|error| FlowError::Internal(error.to_string()))?;
163            Ok(state.$field.deregister(name))
164        }
165    };
166}
167
168macro_rules! global_execution_registry_api {
169    (
170        $(#[$register_meta:meta])*
171        $register_name:ident,
172        $(#[$deregister_meta:meta])*
173        $deregister_name:ident,
174        $field:ident,
175        $fn_type:ty
176    ) => {
177        $(#[$register_meta])*
178        ///
179        /// # Parameters
180        /// - `name`: Unique middleware name in the global registry.
181        /// - `priority`: Lower values run earlier in the chain.
182        /// - `callable`: Execution intercept callback stored under `name`.
183        ///
184        /// # Returns
185        /// A [`Result`] that is `Ok(())` when the intercept was registered.
186        ///
187        /// # Errors
188        /// Returns [`FlowError::AlreadyExists`] when the name is already in
189        /// use or an internal error if the runtime state cannot be updated.
190        pub fn $register_name(name: &str, priority: i32, callable: $fn_type) -> Result<()> {
191            ensure_runtime_owner()?;
192            let context = global_context();
193            let mut state = context
194                .write()
195                .map_err(|error| FlowError::Internal(error.to_string()))?;
196            state
197                .$field
198                .register(name.to_string(), ExecutionIntercept { priority, callable })
199                .map_err(FlowError::AlreadyExists)
200        }
201
202        $(#[$deregister_meta])*
203        ///
204        /// # Parameters
205        /// - `name`: Global middleware name to remove.
206        ///
207        /// # Returns
208        /// A [`Result`] containing `true` when an execution intercept was
209        /// removed and `false` when the name was not registered.
210        ///
211        /// # Errors
212        /// Returns an internal error if the runtime state cannot be updated.
213        pub fn $deregister_name(name: &str) -> Result<bool> {
214            ensure_runtime_owner()?;
215            let context = global_context();
216            let mut state = context
217                .write()
218                .map_err(|error| FlowError::Internal(error.to_string()))?;
219            Ok(state.$field.deregister(name))
220        }
221    };
222}
223
224macro_rules! scope_guardrail_registry_api {
225    (
226        $(#[$register_meta:meta])*
227        $register_name:ident,
228        $(#[$deregister_meta:meta])*
229        $deregister_name:ident,
230        $field:ident,
231        $fn_type:ty
232    ) => {
233        $(#[$register_meta])*
234        ///
235        /// # Parameters
236        /// - `scope_uuid`: UUID of the active scope that owns the middleware.
237        /// - `name`: Unique middleware name within that scope.
238        /// - `priority`: Lower values run earlier in the chain.
239        /// - `guardrail`: Guardrail callback stored under `name`.
240        ///
241        /// # Returns
242        /// A [`Result`] that is `Ok(())` when the guardrail was registered.
243        ///
244        /// # Errors
245        /// Returns [`FlowError::NotFound`] when the scope is not active,
246        /// [`FlowError::AlreadyExists`] when the name is already in use on
247        /// that scope, or an internal error if the runtime owner check fails.
248        pub fn $register_name(
249            scope_uuid: &uuid::Uuid,
250            name: &str,
251            priority: i32,
252            guardrail: $fn_type,
253        ) -> Result<()> {
254            ensure_runtime_owner()?;
255            let scope_stack = current_scope_stack();
256            let mut guard = scope_stack.write().expect("scope stack lock poisoned");
257            let registries = guard
258                .local_registries_mut(scope_uuid)
259                .ok_or_else(|| FlowError::NotFound(format!("scope {scope_uuid} not found")))?;
260            registries
261                .$field
262                .register(name.to_string(), GuardrailEntry { priority, guardrail })
263                .map_err(FlowError::AlreadyExists)
264        }
265
266        $(#[$deregister_meta])*
267        ///
268        /// # Parameters
269        /// - `scope_uuid`: UUID of the active scope that owns the middleware.
270        /// - `name`: Scope-local middleware name to remove.
271        ///
272        /// # Returns
273        /// A [`Result`] containing `true` when a guardrail was removed and
274        /// `false` when the name was not registered on that scope.
275        ///
276        /// # Errors
277        /// Returns [`FlowError::NotFound`] when the scope is not active or an
278        /// internal error if the runtime owner check fails.
279        pub fn $deregister_name(scope_uuid: &uuid::Uuid, name: &str) -> Result<bool> {
280            ensure_runtime_owner()?;
281            let scope_stack = current_scope_stack();
282            let mut guard = scope_stack.write().expect("scope stack lock poisoned");
283            let registries = guard
284                .local_registries_mut(scope_uuid)
285                .ok_or_else(|| FlowError::NotFound(format!("scope {scope_uuid} not found")))?;
286            Ok(registries.$field.deregister(name))
287        }
288    };
289}
290
291macro_rules! scope_intercept_registry_api {
292    (
293        $(#[$register_meta:meta])*
294        $register_name:ident,
295        $(#[$deregister_meta:meta])*
296        $deregister_name:ident,
297        $field:ident,
298        $fn_type:ty
299    ) => {
300        $(#[$register_meta])*
301        ///
302        /// # Parameters
303        /// - `scope_uuid`: UUID of the active scope that owns the middleware.
304        /// - `name`: Unique middleware name within that scope.
305        /// - `priority`: Lower values run earlier in the chain.
306        /// - `break_chain`: Whether the intercept should stop later request
307        ///   intercepts after it returns.
308        /// - `callable`: Intercept callback stored under `name`.
309        ///
310        /// # Returns
311        /// A [`Result`] that is `Ok(())` when the intercept was registered.
312        ///
313        /// # Errors
314        /// Returns [`FlowError::NotFound`] when the scope is not active,
315        /// [`FlowError::AlreadyExists`] when the name is already in use on
316        /// that scope, or an internal error if the runtime owner check fails.
317        pub fn $register_name(
318            scope_uuid: &uuid::Uuid,
319            name: &str,
320            priority: i32,
321            break_chain: bool,
322            callable: $fn_type,
323        ) -> Result<()> {
324            ensure_runtime_owner()?;
325            let scope_stack = current_scope_stack();
326            let mut guard = scope_stack.write().expect("scope stack lock poisoned");
327            let registries = guard
328                .local_registries_mut(scope_uuid)
329                .ok_or_else(|| FlowError::NotFound(format!("scope {scope_uuid} not found")))?;
330            registries
331                .$field
332                .register(
333                    name.to_string(),
334                    Intercept {
335                        priority,
336                        break_chain,
337                        callable,
338                    },
339                )
340                .map_err(FlowError::AlreadyExists)
341        }
342
343        $(#[$deregister_meta])*
344        ///
345        /// # Parameters
346        /// - `scope_uuid`: UUID of the active scope that owns the middleware.
347        /// - `name`: Scope-local middleware name to remove.
348        ///
349        /// # Returns
350        /// A [`Result`] containing `true` when an intercept was removed and
351        /// `false` when the name was not registered on that scope.
352        ///
353        /// # Errors
354        /// Returns [`FlowError::NotFound`] when the scope is not active or an
355        /// internal error if the runtime owner check fails.
356        pub fn $deregister_name(scope_uuid: &uuid::Uuid, name: &str) -> Result<bool> {
357            ensure_runtime_owner()?;
358            let scope_stack = current_scope_stack();
359            let mut guard = scope_stack.write().expect("scope stack lock poisoned");
360            let registries = guard
361                .local_registries_mut(scope_uuid)
362                .ok_or_else(|| FlowError::NotFound(format!("scope {scope_uuid} not found")))?;
363            Ok(registries.$field.deregister(name))
364        }
365    };
366}
367
368macro_rules! scope_execution_registry_api {
369    (
370        $(#[$register_meta:meta])*
371        $register_name:ident,
372        $(#[$deregister_meta:meta])*
373        $deregister_name:ident,
374        $field:ident,
375        $fn_type:ty
376    ) => {
377        $(#[$register_meta])*
378        ///
379        /// # Parameters
380        /// - `scope_uuid`: UUID of the active scope that owns the middleware.
381        /// - `name`: Unique middleware name within that scope.
382        /// - `priority`: Lower values run earlier in the chain.
383        /// - `callable`: Execution intercept callback stored under `name`.
384        ///
385        /// # Returns
386        /// A [`Result`] that is `Ok(())` when the intercept was registered.
387        ///
388        /// # Errors
389        /// Returns [`FlowError::NotFound`] when the scope is not active,
390        /// [`FlowError::AlreadyExists`] when the name is already in use on
391        /// that scope, or an internal error if the runtime owner check fails.
392        pub fn $register_name(
393            scope_uuid: &uuid::Uuid,
394            name: &str,
395            priority: i32,
396            callable: $fn_type,
397        ) -> Result<()> {
398            ensure_runtime_owner()?;
399            let scope_stack = current_scope_stack();
400            let mut guard = scope_stack.write().expect("scope stack lock poisoned");
401            let registries = guard
402                .local_registries_mut(scope_uuid)
403                .ok_or_else(|| FlowError::NotFound(format!("scope {scope_uuid} not found")))?;
404            registries
405                .$field
406                .register(name.to_string(), ExecutionIntercept { priority, callable })
407                .map_err(FlowError::AlreadyExists)
408        }
409
410        $(#[$deregister_meta])*
411        ///
412        /// # Parameters
413        /// - `scope_uuid`: UUID of the active scope that owns the middleware.
414        /// - `name`: Scope-local middleware name to remove.
415        ///
416        /// # Returns
417        /// A [`Result`] containing `true` when an execution intercept was
418        /// removed and `false` when the name was not registered on that scope.
419        ///
420        /// # Errors
421        /// Returns [`FlowError::NotFound`] when the scope is not active or an
422        /// internal error if the runtime owner check fails.
423        pub fn $deregister_name(scope_uuid: &uuid::Uuid, name: &str) -> Result<bool> {
424            ensure_runtime_owner()?;
425            let scope_stack = current_scope_stack();
426            let mut guard = scope_stack.write().expect("scope stack lock poisoned");
427            let registries = guard
428                .local_registries_mut(scope_uuid)
429                .ok_or_else(|| FlowError::NotFound(format!("scope {scope_uuid} not found")))?;
430            Ok(registries.$field.deregister(name))
431        }
432    };
433}
434
435global_guardrail_registry_api!(
436    /// Register a global tool sanitize-request guardrail.
437    /// The guardrail rewrites only the tool input recorded on emitted start
438    /// events.
439    register_tool_sanitize_request_guardrail,
440    /// Deregister a global tool sanitize-request guardrail.
441    deregister_tool_sanitize_request_guardrail,
442    tool_sanitize_request_guardrails,
443    ToolSanitizeFn
444);
445global_guardrail_registry_api!(
446    /// Register a global tool sanitize-response guardrail.
447    /// The guardrail rewrites only the tool output recorded on emitted end
448    /// events.
449    register_tool_sanitize_response_guardrail,
450    /// Deregister a global tool sanitize-response guardrail.
451    deregister_tool_sanitize_response_guardrail,
452    tool_sanitize_response_guardrails,
453    ToolSanitizeFn
454);
455global_guardrail_registry_api!(
456    /// Register a global tool conditional-execution guardrail.
457    /// The guardrail can block tool execution before intercepts or the tool
458    /// callback run.
459    register_tool_conditional_execution_guardrail,
460    /// Deregister a global tool conditional-execution guardrail.
461    deregister_tool_conditional_execution_guardrail,
462    tool_conditional_execution_guardrails,
463    ToolConditionalFn
464);
465global_intercept_registry_api!(
466    /// Register a global tool request intercept.
467    /// Request intercepts can rewrite tool arguments before execution.
468    register_tool_request_intercept,
469    /// Deregister a global tool request intercept.
470    deregister_tool_request_intercept,
471    tool_request_intercepts,
472    ToolInterceptFn
473);
474global_execution_registry_api!(
475    /// Register a global tool execution intercept.
476    /// Execution intercepts can wrap or replace the tool callback.
477    register_tool_execution_intercept,
478    /// Deregister a global tool execution intercept.
479    deregister_tool_execution_intercept,
480    tool_execution_intercepts,
481    ToolExecutionFn
482);
483
484global_guardrail_registry_api!(
485    /// Register a global LLM sanitize-request guardrail.
486    /// The guardrail rewrites only the request payload recorded on emitted
487    /// start events.
488    register_llm_sanitize_request_guardrail,
489    /// Deregister a global LLM sanitize-request guardrail.
490    deregister_llm_sanitize_request_guardrail,
491    llm_sanitize_request_guardrails,
492    LlmSanitizeRequestFn
493);
494global_guardrail_registry_api!(
495    /// Register a global LLM sanitize-response guardrail.
496    /// The guardrail rewrites only the response payload recorded on emitted
497    /// end events.
498    register_llm_sanitize_response_guardrail,
499    /// Deregister a global LLM sanitize-response guardrail.
500    deregister_llm_sanitize_response_guardrail,
501    llm_sanitize_response_guardrails,
502    LlmSanitizeResponseFn
503);
504global_guardrail_registry_api!(
505    /// Register a global LLM conditional-execution guardrail.
506    /// The guardrail can block LLM execution before intercepts or the provider
507    /// callback run.
508    register_llm_conditional_execution_guardrail,
509    /// Deregister a global LLM conditional-execution guardrail.
510    deregister_llm_conditional_execution_guardrail,
511    llm_conditional_execution_guardrails,
512    LlmConditionalFn
513);
514global_intercept_registry_api!(
515    /// Register a global LLM request intercept.
516    /// Request intercepts can rewrite or annotate the outgoing LLM request.
517    register_llm_request_intercept,
518    /// Deregister a global LLM request intercept.
519    deregister_llm_request_intercept,
520    llm_request_intercepts,
521    LlmRequestInterceptFn
522);
523global_execution_registry_api!(
524    /// Register a global LLM execution intercept.
525    /// Execution intercepts can wrap or replace the non-streaming provider
526    /// callback.
527    register_llm_execution_intercept,
528    /// Deregister a global LLM execution intercept.
529    deregister_llm_execution_intercept,
530    llm_execution_intercepts,
531    LlmExecutionFn
532);
533global_execution_registry_api!(
534    /// Register a global streaming LLM execution intercept.
535    /// Execution intercepts can wrap or replace the streaming provider
536    /// callback.
537    register_llm_stream_execution_intercept,
538    /// Deregister a global streaming LLM execution intercept.
539    deregister_llm_stream_execution_intercept,
540    llm_stream_execution_intercepts,
541    LlmStreamExecutionFn
542);
543
544scope_guardrail_registry_api!(
545    /// Register a scope-local tool sanitize-request guardrail.
546    /// The guardrail rewrites only tool input emitted under the owning scope.
547    scope_register_tool_sanitize_request_guardrail,
548    /// Deregister a scope-local tool sanitize-request guardrail.
549    scope_deregister_tool_sanitize_request_guardrail,
550    tool_sanitize_request_guardrails,
551    ToolSanitizeFn
552);
553scope_guardrail_registry_api!(
554    /// Register a scope-local tool sanitize-response guardrail.
555    /// The guardrail rewrites only tool output emitted under the owning scope.
556    scope_register_tool_sanitize_response_guardrail,
557    /// Deregister a scope-local tool sanitize-response guardrail.
558    scope_deregister_tool_sanitize_response_guardrail,
559    tool_sanitize_response_guardrails,
560    ToolSanitizeFn
561);
562scope_guardrail_registry_api!(
563    /// Register a scope-local tool conditional-execution guardrail.
564    /// The guardrail can block tool execution inside the owning scope.
565    scope_register_tool_conditional_execution_guardrail,
566    /// Deregister a scope-local tool conditional-execution guardrail.
567    scope_deregister_tool_conditional_execution_guardrail,
568    tool_conditional_execution_guardrails,
569    ToolConditionalFn
570);
571scope_intercept_registry_api!(
572    /// Register a scope-local tool request intercept.
573    /// Request intercepts can rewrite tool arguments inside the owning scope.
574    scope_register_tool_request_intercept,
575    /// Deregister a scope-local tool request intercept.
576    scope_deregister_tool_request_intercept,
577    tool_request_intercepts,
578    ToolInterceptFn
579);
580scope_execution_registry_api!(
581    /// Register a scope-local tool execution intercept.
582    /// Execution intercepts can wrap or replace the tool callback inside the
583    /// owning scope.
584    scope_register_tool_execution_intercept,
585    /// Deregister a scope-local tool execution intercept.
586    scope_deregister_tool_execution_intercept,
587    tool_execution_intercepts,
588    ToolExecutionFn
589);
590
591scope_guardrail_registry_api!(
592    /// Register a scope-local LLM sanitize-request guardrail.
593    /// The guardrail rewrites only request payloads emitted under the owning
594    /// scope.
595    scope_register_llm_sanitize_request_guardrail,
596    /// Deregister a scope-local LLM sanitize-request guardrail.
597    scope_deregister_llm_sanitize_request_guardrail,
598    llm_sanitize_request_guardrails,
599    LlmSanitizeRequestFn
600);
601scope_guardrail_registry_api!(
602    /// Register a scope-local LLM sanitize-response guardrail.
603    /// The guardrail rewrites only response payloads emitted under the owning
604    /// scope.
605    scope_register_llm_sanitize_response_guardrail,
606    /// Deregister a scope-local LLM sanitize-response guardrail.
607    scope_deregister_llm_sanitize_response_guardrail,
608    llm_sanitize_response_guardrails,
609    LlmSanitizeResponseFn
610);
611scope_guardrail_registry_api!(
612    /// Register a scope-local LLM conditional-execution guardrail.
613    /// The guardrail can block LLM execution inside the owning scope.
614    scope_register_llm_conditional_execution_guardrail,
615    /// Deregister a scope-local LLM conditional-execution guardrail.
616    scope_deregister_llm_conditional_execution_guardrail,
617    llm_conditional_execution_guardrails,
618    LlmConditionalFn
619);
620scope_intercept_registry_api!(
621    /// Register a scope-local LLM request intercept.
622    /// Request intercepts can rewrite or annotate LLM requests inside the
623    /// owning scope.
624    scope_register_llm_request_intercept,
625    /// Deregister a scope-local LLM request intercept.
626    scope_deregister_llm_request_intercept,
627    llm_request_intercepts,
628    LlmRequestInterceptFn
629);
630scope_execution_registry_api!(
631    /// Register a scope-local LLM execution intercept.
632    /// Execution intercepts can wrap or replace the non-streaming provider
633    /// callback inside the owning scope.
634    scope_register_llm_execution_intercept,
635    /// Deregister a scope-local LLM execution intercept.
636    scope_deregister_llm_execution_intercept,
637    llm_execution_intercepts,
638    LlmExecutionFn
639);
640scope_execution_registry_api!(
641    /// Register a scope-local streaming LLM execution intercept.
642    /// Execution intercepts can wrap or replace the streaming provider
643    /// callback inside the owning scope.
644    scope_register_llm_stream_execution_intercept,
645    /// Deregister a scope-local streaming LLM execution intercept.
646    scope_deregister_llm_stream_execution_intercept,
647    llm_stream_execution_intercepts,
648    LlmStreamExecutionFn
649);