Skip to main content

nemo_flow/api/
subscriber.rs

1// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::api::runtime::EventSubscriberFn;
5use crate::api::runtime::current_scope_stack;
6use crate::api::runtime::global_context;
7use crate::api::shared::ensure_runtime_owner;
8use crate::error::{FlowError, Result};
9
10/// Register a global lifecycle event subscriber.
11///
12/// The subscriber is added to the process-wide registry and receives every
13/// emitted scope, tool, LLM, and mark event until it is deregistered.
14///
15/// # Parameters
16/// - `name`: Unique subscriber name in the global registry.
17/// - `callback`: Subscriber callback invoked for each emitted event.
18///
19/// # Returns
20/// A [`Result`] that is `Ok(())` when the subscriber was registered.
21///
22/// # Errors
23/// Returns [`FlowError::AlreadyExists`] when another global subscriber is
24/// already registered under the same name.
25///
26/// # Notes
27/// Global subscribers remain active across scopes until explicitly removed.
28pub fn register_subscriber(name: &str, callback: EventSubscriberFn) -> Result<()> {
29    ensure_runtime_owner()?;
30    let context = global_context();
31    let mut state = context
32        .write()
33        .map_err(|error| FlowError::Internal(error.to_string()))?;
34    if state.event_subscribers.contains_key(name) {
35        return Err(FlowError::AlreadyExists(format!(
36            "{name} subscriber already exists"
37        )));
38    }
39    state.event_subscribers.insert(name.to_string(), callback);
40    Ok(())
41}
42
43/// Deregister a global lifecycle event subscriber.
44///
45/// This removes the named subscriber from the process-wide registry.
46///
47/// # Parameters
48/// - `name`: Global subscriber name to remove.
49///
50/// # Returns
51/// A [`Result`] containing `true` when a subscriber was removed and `false`
52/// when the name was not registered.
53///
54/// # Errors
55/// Returns an error when the global registry lock cannot be acquired safely.
56///
57/// # Notes
58/// Deregistration affects only future event delivery.
59pub fn deregister_subscriber(name: &str) -> Result<bool> {
60    ensure_runtime_owner()?;
61    let context = global_context();
62    let mut state = context
63        .write()
64        .map_err(|error| FlowError::Internal(error.to_string()))?;
65    Ok(state.event_subscribers.remove(name).is_some())
66}
67
68/// Register a scope-local lifecycle event subscriber.
69///
70/// The subscriber remains active only while the target scope is still present
71/// on the active scope stack.
72///
73/// # Parameters
74/// - `scope_uuid`: UUID of the owning scope.
75/// - `name`: Unique subscriber name within the owning scope.
76/// - `callback`: Subscriber callback invoked for events emitted under that
77///   scope hierarchy.
78///
79/// # Returns
80/// A [`Result`] that is `Ok(())` when the subscriber was registered.
81///
82/// # Errors
83/// Returns [`FlowError::NotFound`] when the scope does not exist on the active
84/// stack and [`FlowError::AlreadyExists`] when the scope already owns a
85/// subscriber with the same name.
86///
87/// # Notes
88/// Scope-local subscribers are removed automatically when the owning scope is
89/// popped.
90pub fn scope_register_subscriber(
91    scope_uuid: &uuid::Uuid,
92    name: &str,
93    callback: EventSubscriberFn,
94) -> Result<()> {
95    ensure_runtime_owner()?;
96    let scope_stack = current_scope_stack();
97    let mut guard = scope_stack.write().expect("scope stack lock poisoned");
98    let registries = guard
99        .local_registries_mut(scope_uuid)
100        .ok_or_else(|| FlowError::NotFound(format!("scope {scope_uuid} not found")))?;
101    if registries.event_subscribers.contains_key(name) {
102        return Err(FlowError::AlreadyExists(format!(
103            "{name} subscriber already exists"
104        )));
105    }
106    registries
107        .event_subscribers
108        .insert(name.to_string(), callback);
109    Ok(())
110}
111
112/// Deregister a scope-local lifecycle event subscriber.
113///
114/// This removes the named subscriber from the registry attached to a specific
115/// active scope.
116///
117/// # Parameters
118/// - `scope_uuid`: UUID of the owning scope.
119/// - `name`: Scope-local subscriber name to remove.
120///
121/// # Returns
122/// A [`Result`] containing `true` when a subscriber was removed and `false`
123/// when the name was not registered on that scope.
124///
125/// # Errors
126/// Returns [`FlowError::NotFound`] when the scope does not exist on the active
127/// stack.
128///
129/// # Notes
130/// Deregistration affects only future event delivery for that scope.
131pub fn scope_deregister_subscriber(scope_uuid: &uuid::Uuid, name: &str) -> Result<bool> {
132    ensure_runtime_owner()?;
133    let scope_stack = current_scope_stack();
134    let mut guard = scope_stack.write().expect("scope stack lock poisoned");
135    let registries = guard
136        .local_registries_mut(scope_uuid)
137        .ok_or_else(|| FlowError::NotFound(format!("scope {scope_uuid} not found")))?;
138    Ok(registries.event_subscribers.remove(name).is_some())
139}