nemo_flow/api/subscriber.rs
1// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::api::runtime::EventSubscriberFn;
5use crate::api::runtime::current_scope_stack;
6use crate::api::runtime::global_context;
7use crate::api::shared::ensure_runtime_owner;
8use crate::error::{FlowError, Result};
9
10/// Register a global lifecycle event subscriber.
11///
12/// The subscriber is added to the process-wide registry and receives every
13/// emitted scope, tool, LLM, and mark event until it is deregistered.
14///
15/// # Parameters
16/// - `name`: Unique subscriber name in the global registry.
17/// - `callback`: Subscriber callback invoked for each emitted event.
18///
19/// # Returns
20/// A [`Result`] that is `Ok(())` when the subscriber was registered.
21///
22/// # Errors
23/// Returns [`FlowError::AlreadyExists`] when another global subscriber is
24/// already registered under the same name.
25///
26/// # Notes
27/// Global subscribers remain active across scopes until explicitly removed.
28pub fn register_subscriber(name: &str, callback: EventSubscriberFn) -> Result<()> {
29 ensure_runtime_owner()?;
30 let context = global_context();
31 let mut state = context
32 .write()
33 .map_err(|error| FlowError::Internal(error.to_string()))?;
34 if state.event_subscribers.contains_key(name) {
35 return Err(FlowError::AlreadyExists(format!(
36 "{name} subscriber already exists"
37 )));
38 }
39 state.event_subscribers.insert(name.to_string(), callback);
40 Ok(())
41}
42
43/// Deregister a global lifecycle event subscriber.
44///
45/// This removes the named subscriber from the process-wide registry.
46///
47/// # Parameters
48/// - `name`: Global subscriber name to remove.
49///
50/// # Returns
51/// A [`Result`] containing `true` when a subscriber was removed and `false`
52/// when the name was not registered.
53///
54/// # Errors
55/// Returns an error when the global registry lock cannot be acquired safely.
56///
57/// # Notes
58/// Deregistration affects only future event delivery.
59pub fn deregister_subscriber(name: &str) -> Result<bool> {
60 ensure_runtime_owner()?;
61 let context = global_context();
62 let mut state = context
63 .write()
64 .map_err(|error| FlowError::Internal(error.to_string()))?;
65 Ok(state.event_subscribers.remove(name).is_some())
66}
67
68/// Register a scope-local lifecycle event subscriber.
69///
70/// The subscriber remains active only while the target scope is still present
71/// on the active scope stack.
72///
73/// # Parameters
74/// - `scope_uuid`: UUID of the owning scope.
75/// - `name`: Unique subscriber name within the owning scope.
76/// - `callback`: Subscriber callback invoked for events emitted under that
77/// scope hierarchy.
78///
79/// # Returns
80/// A [`Result`] that is `Ok(())` when the subscriber was registered.
81///
82/// # Errors
83/// Returns [`FlowError::NotFound`] when the scope does not exist on the active
84/// stack and [`FlowError::AlreadyExists`] when the scope already owns a
85/// subscriber with the same name.
86///
87/// # Notes
88/// Scope-local subscribers are removed automatically when the owning scope is
89/// popped.
90pub fn scope_register_subscriber(
91 scope_uuid: &uuid::Uuid,
92 name: &str,
93 callback: EventSubscriberFn,
94) -> Result<()> {
95 ensure_runtime_owner()?;
96 let scope_stack = current_scope_stack();
97 let mut guard = scope_stack.write().expect("scope stack lock poisoned");
98 let registries = guard
99 .local_registries_mut(scope_uuid)
100 .ok_or_else(|| FlowError::NotFound(format!("scope {scope_uuid} not found")))?;
101 if registries.event_subscribers.contains_key(name) {
102 return Err(FlowError::AlreadyExists(format!(
103 "{name} subscriber already exists"
104 )));
105 }
106 registries
107 .event_subscribers
108 .insert(name.to_string(), callback);
109 Ok(())
110}
111
112/// Deregister a scope-local lifecycle event subscriber.
113///
114/// This removes the named subscriber from the registry attached to a specific
115/// active scope.
116///
117/// # Parameters
118/// - `scope_uuid`: UUID of the owning scope.
119/// - `name`: Scope-local subscriber name to remove.
120///
121/// # Returns
122/// A [`Result`] containing `true` when a subscriber was removed and `false`
123/// when the name was not registered on that scope.
124///
125/// # Errors
126/// Returns [`FlowError::NotFound`] when the scope does not exist on the active
127/// stack.
128///
129/// # Notes
130/// Deregistration affects only future event delivery for that scope.
131pub fn scope_deregister_subscriber(scope_uuid: &uuid::Uuid, name: &str) -> Result<bool> {
132 ensure_runtime_owner()?;
133 let scope_stack = current_scope_stack();
134 let mut guard = scope_stack.write().expect("scope stack lock poisoned");
135 let registries = guard
136 .local_registries_mut(scope_uuid)
137 .ok_or_else(|| FlowError::NotFound(format!("scope {scope_uuid} not found")))?;
138 Ok(registries.event_subscribers.remove(name).is_some())
139}