nemo_flow/api/registry.rs
1// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Middleware registry helpers for global and scope-local guardrails,
5//! intercepts, and subscribers.
6
7use crate::api::runtime::{
8 LlmConditionalFn, LlmExecutionFn, LlmRequestInterceptFn, LlmSanitizeRequestFn,
9 LlmSanitizeResponseFn, LlmStreamExecutionFn, ToolConditionalFn, ToolExecutionFn,
10 ToolInterceptFn, ToolSanitizeFn,
11};
12use crate::api::runtime::{current_scope_stack, global_context};
13use crate::api::shared::ensure_runtime_owner;
14use crate::error::{FlowError, Result};
15
16/// A priority-ordered request intercept registration entry.
17pub struct Intercept<F> {
18 /// Lower values run earlier in the chain.
19 pub priority: i32,
20 /// Whether this intercept stops later request intercepts after it returns.
21 pub break_chain: bool,
22 /// The caller-provided intercept callback.
23 pub callable: F,
24}
25
26/// A priority-ordered execution intercept registration entry.
27pub struct ExecutionIntercept<F> {
28 /// Lower values run earlier in the chain.
29 pub priority: i32,
30 /// The caller-provided execution intercept callback.
31 pub callable: F,
32}
33
34/// A priority-ordered guardrail registration entry.
35pub struct GuardrailEntry<F> {
36 /// Lower values run earlier in the chain.
37 pub priority: i32,
38 /// The caller-provided guardrail callback.
39 pub guardrail: F,
40}
41
42macro_rules! global_guardrail_registry_api {
43 (
44 $(#[$register_meta:meta])*
45 $register_name:ident,
46 $(#[$deregister_meta:meta])*
47 $deregister_name:ident,
48 $field:ident,
49 $fn_type:ty
50 ) => {
51 $(#[$register_meta])*
52 ///
53 /// # Parameters
54 /// - `name`: Unique middleware name in the global registry.
55 /// - `priority`: Lower values run earlier in the chain.
56 /// - `guardrail`: Guardrail callback stored under `name`.
57 ///
58 /// # Returns
59 /// A [`Result`] that is `Ok(())` when the guardrail was registered.
60 ///
61 /// # Errors
62 /// Returns [`FlowError::AlreadyExists`] when the name is already in
63 /// use or an internal error if the runtime state cannot be updated.
64 pub fn $register_name(name: &str, priority: i32, guardrail: $fn_type) -> Result<()> {
65 ensure_runtime_owner()?;
66 let context = global_context();
67 let mut state = context
68 .write()
69 .map_err(|error| FlowError::Internal(error.to_string()))?;
70 state
71 .$field
72 .register(name.to_string(), GuardrailEntry { priority, guardrail })
73 .map_err(FlowError::AlreadyExists)
74 }
75
76 $(#[$deregister_meta])*
77 ///
78 /// # Parameters
79 /// - `name`: Global middleware name to remove.
80 ///
81 /// # Returns
82 /// A [`Result`] containing `true` when a guardrail was removed and
83 /// `false` when the name was not registered.
84 ///
85 /// # Errors
86 /// Returns an internal error if the runtime state cannot be updated.
87 pub fn $deregister_name(name: &str) -> Result<bool> {
88 ensure_runtime_owner()?;
89 let context = global_context();
90 let mut state = context
91 .write()
92 .map_err(|error| FlowError::Internal(error.to_string()))?;
93 Ok(state.$field.deregister(name))
94 }
95 };
96}
97
98macro_rules! global_intercept_registry_api {
99 (
100 $(#[$register_meta:meta])*
101 $register_name:ident,
102 $(#[$deregister_meta:meta])*
103 $deregister_name:ident,
104 $field:ident,
105 $fn_type:ty
106 ) => {
107 $(#[$register_meta])*
108 ///
109 /// # Parameters
110 /// - `name`: Unique middleware name in the global registry.
111 /// - `priority`: Lower values run earlier in the chain.
112 /// - `break_chain`: Whether the intercept should stop later request
113 /// intercepts after it returns.
114 /// - `callable`: Intercept callback stored under `name`.
115 ///
116 /// # Returns
117 /// A [`Result`] that is `Ok(())` when the intercept was registered.
118 ///
119 /// # Errors
120 /// Returns [`FlowError::AlreadyExists`] when the name is already in
121 /// use or an internal error if the runtime state cannot be updated.
122 pub fn $register_name(
123 name: &str,
124 priority: i32,
125 break_chain: bool,
126 callable: $fn_type,
127 ) -> Result<()> {
128 ensure_runtime_owner()?;
129 let context = global_context();
130 let mut state = context
131 .write()
132 .map_err(|error| FlowError::Internal(error.to_string()))?;
133 state
134 .$field
135 .register(
136 name.to_string(),
137 Intercept {
138 priority,
139 break_chain,
140 callable,
141 },
142 )
143 .map_err(FlowError::AlreadyExists)
144 }
145
146 $(#[$deregister_meta])*
147 ///
148 /// # Parameters
149 /// - `name`: Global middleware name to remove.
150 ///
151 /// # Returns
152 /// A [`Result`] containing `true` when an intercept was removed and
153 /// `false` when the name was not registered.
154 ///
155 /// # Errors
156 /// Returns an internal error if the runtime state cannot be updated.
157 pub fn $deregister_name(name: &str) -> Result<bool> {
158 ensure_runtime_owner()?;
159 let context = global_context();
160 let mut state = context
161 .write()
162 .map_err(|error| FlowError::Internal(error.to_string()))?;
163 Ok(state.$field.deregister(name))
164 }
165 };
166}
167
168macro_rules! global_execution_registry_api {
169 (
170 $(#[$register_meta:meta])*
171 $register_name:ident,
172 $(#[$deregister_meta:meta])*
173 $deregister_name:ident,
174 $field:ident,
175 $fn_type:ty
176 ) => {
177 $(#[$register_meta])*
178 ///
179 /// # Parameters
180 /// - `name`: Unique middleware name in the global registry.
181 /// - `priority`: Lower values run earlier in the chain.
182 /// - `callable`: Execution intercept callback stored under `name`.
183 ///
184 /// # Returns
185 /// A [`Result`] that is `Ok(())` when the intercept was registered.
186 ///
187 /// # Errors
188 /// Returns [`FlowError::AlreadyExists`] when the name is already in
189 /// use or an internal error if the runtime state cannot be updated.
190 pub fn $register_name(name: &str, priority: i32, callable: $fn_type) -> Result<()> {
191 ensure_runtime_owner()?;
192 let context = global_context();
193 let mut state = context
194 .write()
195 .map_err(|error| FlowError::Internal(error.to_string()))?;
196 state
197 .$field
198 .register(name.to_string(), ExecutionIntercept { priority, callable })
199 .map_err(FlowError::AlreadyExists)
200 }
201
202 $(#[$deregister_meta])*
203 ///
204 /// # Parameters
205 /// - `name`: Global middleware name to remove.
206 ///
207 /// # Returns
208 /// A [`Result`] containing `true` when an execution intercept was
209 /// removed and `false` when the name was not registered.
210 ///
211 /// # Errors
212 /// Returns an internal error if the runtime state cannot be updated.
213 pub fn $deregister_name(name: &str) -> Result<bool> {
214 ensure_runtime_owner()?;
215 let context = global_context();
216 let mut state = context
217 .write()
218 .map_err(|error| FlowError::Internal(error.to_string()))?;
219 Ok(state.$field.deregister(name))
220 }
221 };
222}
223
224macro_rules! scope_guardrail_registry_api {
225 (
226 $(#[$register_meta:meta])*
227 $register_name:ident,
228 $(#[$deregister_meta:meta])*
229 $deregister_name:ident,
230 $field:ident,
231 $fn_type:ty
232 ) => {
233 $(#[$register_meta])*
234 ///
235 /// # Parameters
236 /// - `scope_uuid`: UUID of the active scope that owns the middleware.
237 /// - `name`: Unique middleware name within that scope.
238 /// - `priority`: Lower values run earlier in the chain.
239 /// - `guardrail`: Guardrail callback stored under `name`.
240 ///
241 /// # Returns
242 /// A [`Result`] that is `Ok(())` when the guardrail was registered.
243 ///
244 /// # Errors
245 /// Returns [`FlowError::NotFound`] when the scope is not active,
246 /// [`FlowError::AlreadyExists`] when the name is already in use on
247 /// that scope, or an internal error if the runtime owner check fails.
248 pub fn $register_name(
249 scope_uuid: &uuid::Uuid,
250 name: &str,
251 priority: i32,
252 guardrail: $fn_type,
253 ) -> Result<()> {
254 ensure_runtime_owner()?;
255 let scope_stack = current_scope_stack();
256 let mut guard = scope_stack.write().expect("scope stack lock poisoned");
257 let registries = guard
258 .local_registries_mut(scope_uuid)
259 .ok_or_else(|| FlowError::NotFound(format!("scope {scope_uuid} not found")))?;
260 registries
261 .$field
262 .register(name.to_string(), GuardrailEntry { priority, guardrail })
263 .map_err(FlowError::AlreadyExists)
264 }
265
266 $(#[$deregister_meta])*
267 ///
268 /// # Parameters
269 /// - `scope_uuid`: UUID of the active scope that owns the middleware.
270 /// - `name`: Scope-local middleware name to remove.
271 ///
272 /// # Returns
273 /// A [`Result`] containing `true` when a guardrail was removed and
274 /// `false` when the name was not registered on that scope.
275 ///
276 /// # Errors
277 /// Returns [`FlowError::NotFound`] when the scope is not active or an
278 /// internal error if the runtime owner check fails.
279 pub fn $deregister_name(scope_uuid: &uuid::Uuid, name: &str) -> Result<bool> {
280 ensure_runtime_owner()?;
281 let scope_stack = current_scope_stack();
282 let mut guard = scope_stack.write().expect("scope stack lock poisoned");
283 let registries = guard
284 .local_registries_mut(scope_uuid)
285 .ok_or_else(|| FlowError::NotFound(format!("scope {scope_uuid} not found")))?;
286 Ok(registries.$field.deregister(name))
287 }
288 };
289}
290
291macro_rules! scope_intercept_registry_api {
292 (
293 $(#[$register_meta:meta])*
294 $register_name:ident,
295 $(#[$deregister_meta:meta])*
296 $deregister_name:ident,
297 $field:ident,
298 $fn_type:ty
299 ) => {
300 $(#[$register_meta])*
301 ///
302 /// # Parameters
303 /// - `scope_uuid`: UUID of the active scope that owns the middleware.
304 /// - `name`: Unique middleware name within that scope.
305 /// - `priority`: Lower values run earlier in the chain.
306 /// - `break_chain`: Whether the intercept should stop later request
307 /// intercepts after it returns.
308 /// - `callable`: Intercept callback stored under `name`.
309 ///
310 /// # Returns
311 /// A [`Result`] that is `Ok(())` when the intercept was registered.
312 ///
313 /// # Errors
314 /// Returns [`FlowError::NotFound`] when the scope is not active,
315 /// [`FlowError::AlreadyExists`] when the name is already in use on
316 /// that scope, or an internal error if the runtime owner check fails.
317 pub fn $register_name(
318 scope_uuid: &uuid::Uuid,
319 name: &str,
320 priority: i32,
321 break_chain: bool,
322 callable: $fn_type,
323 ) -> Result<()> {
324 ensure_runtime_owner()?;
325 let scope_stack = current_scope_stack();
326 let mut guard = scope_stack.write().expect("scope stack lock poisoned");
327 let registries = guard
328 .local_registries_mut(scope_uuid)
329 .ok_or_else(|| FlowError::NotFound(format!("scope {scope_uuid} not found")))?;
330 registries
331 .$field
332 .register(
333 name.to_string(),
334 Intercept {
335 priority,
336 break_chain,
337 callable,
338 },
339 )
340 .map_err(FlowError::AlreadyExists)
341 }
342
343 $(#[$deregister_meta])*
344 ///
345 /// # Parameters
346 /// - `scope_uuid`: UUID of the active scope that owns the middleware.
347 /// - `name`: Scope-local middleware name to remove.
348 ///
349 /// # Returns
350 /// A [`Result`] containing `true` when an intercept was removed and
351 /// `false` when the name was not registered on that scope.
352 ///
353 /// # Errors
354 /// Returns [`FlowError::NotFound`] when the scope is not active or an
355 /// internal error if the runtime owner check fails.
356 pub fn $deregister_name(scope_uuid: &uuid::Uuid, name: &str) -> Result<bool> {
357 ensure_runtime_owner()?;
358 let scope_stack = current_scope_stack();
359 let mut guard = scope_stack.write().expect("scope stack lock poisoned");
360 let registries = guard
361 .local_registries_mut(scope_uuid)
362 .ok_or_else(|| FlowError::NotFound(format!("scope {scope_uuid} not found")))?;
363 Ok(registries.$field.deregister(name))
364 }
365 };
366}
367
368macro_rules! scope_execution_registry_api {
369 (
370 $(#[$register_meta:meta])*
371 $register_name:ident,
372 $(#[$deregister_meta:meta])*
373 $deregister_name:ident,
374 $field:ident,
375 $fn_type:ty
376 ) => {
377 $(#[$register_meta])*
378 ///
379 /// # Parameters
380 /// - `scope_uuid`: UUID of the active scope that owns the middleware.
381 /// - `name`: Unique middleware name within that scope.
382 /// - `priority`: Lower values run earlier in the chain.
383 /// - `callable`: Execution intercept callback stored under `name`.
384 ///
385 /// # Returns
386 /// A [`Result`] that is `Ok(())` when the intercept was registered.
387 ///
388 /// # Errors
389 /// Returns [`FlowError::NotFound`] when the scope is not active,
390 /// [`FlowError::AlreadyExists`] when the name is already in use on
391 /// that scope, or an internal error if the runtime owner check fails.
392 pub fn $register_name(
393 scope_uuid: &uuid::Uuid,
394 name: &str,
395 priority: i32,
396 callable: $fn_type,
397 ) -> Result<()> {
398 ensure_runtime_owner()?;
399 let scope_stack = current_scope_stack();
400 let mut guard = scope_stack.write().expect("scope stack lock poisoned");
401 let registries = guard
402 .local_registries_mut(scope_uuid)
403 .ok_or_else(|| FlowError::NotFound(format!("scope {scope_uuid} not found")))?;
404 registries
405 .$field
406 .register(name.to_string(), ExecutionIntercept { priority, callable })
407 .map_err(FlowError::AlreadyExists)
408 }
409
410 $(#[$deregister_meta])*
411 ///
412 /// # Parameters
413 /// - `scope_uuid`: UUID of the active scope that owns the middleware.
414 /// - `name`: Scope-local middleware name to remove.
415 ///
416 /// # Returns
417 /// A [`Result`] containing `true` when an execution intercept was
418 /// removed and `false` when the name was not registered on that scope.
419 ///
420 /// # Errors
421 /// Returns [`FlowError::NotFound`] when the scope is not active or an
422 /// internal error if the runtime owner check fails.
423 pub fn $deregister_name(scope_uuid: &uuid::Uuid, name: &str) -> Result<bool> {
424 ensure_runtime_owner()?;
425 let scope_stack = current_scope_stack();
426 let mut guard = scope_stack.write().expect("scope stack lock poisoned");
427 let registries = guard
428 .local_registries_mut(scope_uuid)
429 .ok_or_else(|| FlowError::NotFound(format!("scope {scope_uuid} not found")))?;
430 Ok(registries.$field.deregister(name))
431 }
432 };
433}
434
435global_guardrail_registry_api!(
436 /// Register a global tool sanitize-request guardrail.
437 /// The guardrail rewrites only the tool input recorded on emitted start
438 /// events.
439 register_tool_sanitize_request_guardrail,
440 /// Deregister a global tool sanitize-request guardrail.
441 deregister_tool_sanitize_request_guardrail,
442 tool_sanitize_request_guardrails,
443 ToolSanitizeFn
444);
445global_guardrail_registry_api!(
446 /// Register a global tool sanitize-response guardrail.
447 /// The guardrail rewrites only the tool output recorded on emitted end
448 /// events.
449 register_tool_sanitize_response_guardrail,
450 /// Deregister a global tool sanitize-response guardrail.
451 deregister_tool_sanitize_response_guardrail,
452 tool_sanitize_response_guardrails,
453 ToolSanitizeFn
454);
455global_guardrail_registry_api!(
456 /// Register a global tool conditional-execution guardrail.
457 /// The guardrail can block tool execution before intercepts or the tool
458 /// callback run.
459 register_tool_conditional_execution_guardrail,
460 /// Deregister a global tool conditional-execution guardrail.
461 deregister_tool_conditional_execution_guardrail,
462 tool_conditional_execution_guardrails,
463 ToolConditionalFn
464);
465global_intercept_registry_api!(
466 /// Register a global tool request intercept.
467 /// Request intercepts can rewrite tool arguments before execution.
468 register_tool_request_intercept,
469 /// Deregister a global tool request intercept.
470 deregister_tool_request_intercept,
471 tool_request_intercepts,
472 ToolInterceptFn
473);
474global_execution_registry_api!(
475 /// Register a global tool execution intercept.
476 /// Execution intercepts can wrap or replace the tool callback.
477 register_tool_execution_intercept,
478 /// Deregister a global tool execution intercept.
479 deregister_tool_execution_intercept,
480 tool_execution_intercepts,
481 ToolExecutionFn
482);
483
484global_guardrail_registry_api!(
485 /// Register a global LLM sanitize-request guardrail.
486 /// The guardrail rewrites only the request payload recorded on emitted
487 /// start events.
488 register_llm_sanitize_request_guardrail,
489 /// Deregister a global LLM sanitize-request guardrail.
490 deregister_llm_sanitize_request_guardrail,
491 llm_sanitize_request_guardrails,
492 LlmSanitizeRequestFn
493);
494global_guardrail_registry_api!(
495 /// Register a global LLM sanitize-response guardrail.
496 /// The guardrail rewrites only the response payload recorded on emitted
497 /// end events.
498 register_llm_sanitize_response_guardrail,
499 /// Deregister a global LLM sanitize-response guardrail.
500 deregister_llm_sanitize_response_guardrail,
501 llm_sanitize_response_guardrails,
502 LlmSanitizeResponseFn
503);
504global_guardrail_registry_api!(
505 /// Register a global LLM conditional-execution guardrail.
506 /// The guardrail can block LLM execution before intercepts or the provider
507 /// callback run.
508 register_llm_conditional_execution_guardrail,
509 /// Deregister a global LLM conditional-execution guardrail.
510 deregister_llm_conditional_execution_guardrail,
511 llm_conditional_execution_guardrails,
512 LlmConditionalFn
513);
514global_intercept_registry_api!(
515 /// Register a global LLM request intercept.
516 /// Request intercepts can rewrite or annotate the outgoing LLM request.
517 register_llm_request_intercept,
518 /// Deregister a global LLM request intercept.
519 deregister_llm_request_intercept,
520 llm_request_intercepts,
521 LlmRequestInterceptFn
522);
523global_execution_registry_api!(
524 /// Register a global LLM execution intercept.
525 /// Execution intercepts can wrap or replace the non-streaming provider
526 /// callback.
527 register_llm_execution_intercept,
528 /// Deregister a global LLM execution intercept.
529 deregister_llm_execution_intercept,
530 llm_execution_intercepts,
531 LlmExecutionFn
532);
533global_execution_registry_api!(
534 /// Register a global streaming LLM execution intercept.
535 /// Execution intercepts can wrap or replace the streaming provider
536 /// callback.
537 register_llm_stream_execution_intercept,
538 /// Deregister a global streaming LLM execution intercept.
539 deregister_llm_stream_execution_intercept,
540 llm_stream_execution_intercepts,
541 LlmStreamExecutionFn
542);
543
544scope_guardrail_registry_api!(
545 /// Register a scope-local tool sanitize-request guardrail.
546 /// The guardrail rewrites only tool input emitted under the owning scope.
547 scope_register_tool_sanitize_request_guardrail,
548 /// Deregister a scope-local tool sanitize-request guardrail.
549 scope_deregister_tool_sanitize_request_guardrail,
550 tool_sanitize_request_guardrails,
551 ToolSanitizeFn
552);
553scope_guardrail_registry_api!(
554 /// Register a scope-local tool sanitize-response guardrail.
555 /// The guardrail rewrites only tool output emitted under the owning scope.
556 scope_register_tool_sanitize_response_guardrail,
557 /// Deregister a scope-local tool sanitize-response guardrail.
558 scope_deregister_tool_sanitize_response_guardrail,
559 tool_sanitize_response_guardrails,
560 ToolSanitizeFn
561);
562scope_guardrail_registry_api!(
563 /// Register a scope-local tool conditional-execution guardrail.
564 /// The guardrail can block tool execution inside the owning scope.
565 scope_register_tool_conditional_execution_guardrail,
566 /// Deregister a scope-local tool conditional-execution guardrail.
567 scope_deregister_tool_conditional_execution_guardrail,
568 tool_conditional_execution_guardrails,
569 ToolConditionalFn
570);
571scope_intercept_registry_api!(
572 /// Register a scope-local tool request intercept.
573 /// Request intercepts can rewrite tool arguments inside the owning scope.
574 scope_register_tool_request_intercept,
575 /// Deregister a scope-local tool request intercept.
576 scope_deregister_tool_request_intercept,
577 tool_request_intercepts,
578 ToolInterceptFn
579);
580scope_execution_registry_api!(
581 /// Register a scope-local tool execution intercept.
582 /// Execution intercepts can wrap or replace the tool callback inside the
583 /// owning scope.
584 scope_register_tool_execution_intercept,
585 /// Deregister a scope-local tool execution intercept.
586 scope_deregister_tool_execution_intercept,
587 tool_execution_intercepts,
588 ToolExecutionFn
589);
590
591scope_guardrail_registry_api!(
592 /// Register a scope-local LLM sanitize-request guardrail.
593 /// The guardrail rewrites only request payloads emitted under the owning
594 /// scope.
595 scope_register_llm_sanitize_request_guardrail,
596 /// Deregister a scope-local LLM sanitize-request guardrail.
597 scope_deregister_llm_sanitize_request_guardrail,
598 llm_sanitize_request_guardrails,
599 LlmSanitizeRequestFn
600);
601scope_guardrail_registry_api!(
602 /// Register a scope-local LLM sanitize-response guardrail.
603 /// The guardrail rewrites only response payloads emitted under the owning
604 /// scope.
605 scope_register_llm_sanitize_response_guardrail,
606 /// Deregister a scope-local LLM sanitize-response guardrail.
607 scope_deregister_llm_sanitize_response_guardrail,
608 llm_sanitize_response_guardrails,
609 LlmSanitizeResponseFn
610);
611scope_guardrail_registry_api!(
612 /// Register a scope-local LLM conditional-execution guardrail.
613 /// The guardrail can block LLM execution inside the owning scope.
614 scope_register_llm_conditional_execution_guardrail,
615 /// Deregister a scope-local LLM conditional-execution guardrail.
616 scope_deregister_llm_conditional_execution_guardrail,
617 llm_conditional_execution_guardrails,
618 LlmConditionalFn
619);
620scope_intercept_registry_api!(
621 /// Register a scope-local LLM request intercept.
622 /// Request intercepts can rewrite or annotate LLM requests inside the
623 /// owning scope.
624 scope_register_llm_request_intercept,
625 /// Deregister a scope-local LLM request intercept.
626 scope_deregister_llm_request_intercept,
627 llm_request_intercepts,
628 LlmRequestInterceptFn
629);
630scope_execution_registry_api!(
631 /// Register a scope-local LLM execution intercept.
632 /// Execution intercepts can wrap or replace the non-streaming provider
633 /// callback inside the owning scope.
634 scope_register_llm_execution_intercept,
635 /// Deregister a scope-local LLM execution intercept.
636 scope_deregister_llm_execution_intercept,
637 llm_execution_intercepts,
638 LlmExecutionFn
639);
640scope_execution_registry_api!(
641 /// Register a scope-local streaming LLM execution intercept.
642 /// Execution intercepts can wrap or replace the streaming provider
643 /// callback inside the owning scope.
644 scope_register_llm_stream_execution_intercept,
645 /// Deregister a scope-local streaming LLM execution intercept.
646 scope_deregister_llm_stream_execution_intercept,
647 llm_stream_execution_intercepts,
648 LlmStreamExecutionFn
649);