dynamo_runtime/pipeline/
registry.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use std::any::Any;
17use std::collections::HashMap;
18use std::sync::Arc;
19
20/// Registry struct that manages both shared and unique objects.
21///
22/// # Examples
23///
24/// ```
25/// use dynamo_runtime::pipeline::registry::Registry;
26///
27/// let mut registry = Registry::new();
28///
29/// // Insert and retrieve shared objects
30/// registry.insert_shared("shared1", 42);
31/// assert_eq!(*registry.get_shared::<i32>("shared1").unwrap(), 42);
32///
33/// // Insert and take unique objects
34/// registry.insert_unique("unique1", "Hello".to_string());
35/// assert_eq!(registry.take_unique::<String>("unique1").unwrap(), "Hello");
36///
37/// // Taking the same unique again should fail since it's not cloneable
38/// assert!(registry.take_unique::<String>("unique1").is_err());
39///
40/// // Insert and clone unique objects
41/// registry.insert_unique("unique2", "World".to_string());
42/// assert_eq!(registry.clone_unique::<String>("unique2").unwrap(), "World");
43///
44/// // Taking the same cloned unique should is ok
45/// assert!(registry.take_unique::<String>("unique2").is_ok());
46///
47/// ```
48#[derive(Debug, Default)]
49pub struct Registry {
50    shared_storage: HashMap<String, Arc<dyn Any + Send + Sync>>, // Shared objects
51    unique_storage: HashMap<String, Box<dyn Any + Send + Sync>>, // Takable objects
52}
53
54impl Registry {
55    /// Create a new empty registry.
56    pub fn new() -> Self {
57        Registry {
58            shared_storage: HashMap::new(),
59            unique_storage: HashMap::new(),
60        }
61    }
62
63    /// Check if a shared object exists in the registry by key.
64    pub fn contains_shared(&self, key: &str) -> bool {
65        self.shared_storage.contains_key(key)
66    }
67
68    /// Insert a shared object into the registry with a specific key.
69    pub fn insert_shared<K: ToString, U: Send + Sync + 'static>(&mut self, key: K, value: U) {
70        self.shared_storage.insert(
71            key.to_string(),
72            Arc::new(value) as Arc<dyn Any + Send + Sync>,
73        );
74    }
75
76    /// Retrieve a shared object from the registry by key and type.
77    pub fn get_shared<V: Send + Sync + 'static>(&self, key: &str) -> Result<Arc<V>, String> {
78        match self.shared_storage.get(key) {
79            Some(boxed) => boxed.clone().downcast::<V>().map_err(|_| {
80                format!(
81                    "Failed to downcast to the requested type for shared key: {}",
82                    key
83                )
84            }),
85            None => Err(format!("Shared key not found: {}", key)),
86        }
87    }
88
89    /// Check if a unique object exists in the registry by key.
90    pub fn contains_unique(&self, key: &str) -> bool {
91        self.unique_storage.contains_key(key)
92    }
93
94    /// Insert a unique object into the registry with a specific key.
95    pub fn insert_unique<K: ToString, U: Send + Sync + 'static>(&mut self, key: K, value: U) {
96        self.unique_storage.insert(
97            key.to_string(),
98            Box::new(value) as Box<dyn Any + Send + Sync>,
99        );
100    }
101
102    /// Take a unique object from the registry by key and type, removing it from the registry.
103    pub fn take_unique<V: Send + Sync + 'static>(&mut self, key: &str) -> Result<V, String> {
104        match self.unique_storage.remove(key) {
105            Some(boxed) => boxed.downcast::<V>().map(|b| *b).map_err(|_| {
106                format!(
107                    "Failed to downcast to the requested type for unique key: {}",
108                    key
109                )
110            }),
111            None => Err(format!("Takable key not found: {}", key)),
112        }
113    }
114
115    /// Clone a unique object from the registry if it implements `Clone`.
116    pub fn clone_unique<V: Clone + Send + Sync + 'static>(&self, key: &str) -> Result<V, String> {
117        match self.unique_storage.get(key) {
118            Some(boxed) => boxed.downcast_ref::<V>().cloned().ok_or_else(|| {
119                format!(
120                    "Failed to downcast to the requested type for unique key: {}",
121                    key
122                )
123            }),
124            None => Err(format!("Takable key not found: {}", key)),
125        }
126    }
127}
128
129#[cfg(test)]
130mod tests {
131    use super::*;
132
133    #[test]
134    fn test_insert_and_get_shared() {
135        let mut registry = Registry::new();
136        registry.insert_shared("shared1", 42);
137        assert_eq!(*registry.get_shared::<i32>("shared1").unwrap(), 42);
138        assert!(registry.get_shared::<f64>("shared1").is_err()); // Testing a downcast failure
139    }
140
141    #[test]
142    fn test_insert_and_take_unique() {
143        let mut registry = Registry::new();
144        registry.insert_unique("unique1", "Hello".to_string());
145        assert_eq!(registry.take_unique::<String>("unique1").unwrap(), "Hello");
146        assert!(registry.take_unique::<String>("unique1").is_err()); // Key is now missing
147    }
148
149    #[test]
150    fn test_insert_and_clone_then_take_unique() {
151        let mut registry = Registry::new();
152
153        registry.insert_unique("unique2", "World".to_string());
154
155        assert_eq!(registry.clone_unique::<String>("unique2").unwrap(), "World");
156
157        // When cloned, the object should still be available for taking
158        assert!(registry.take_unique::<String>("unique2").is_ok());
159    }
160
161    #[test]
162    fn test_failed_take_after_cloning() {
163        let mut registry = Registry::new();
164
165        registry.insert_unique("unique3", "Another".to_string());
166        assert_eq!(
167            registry.clone_unique::<String>("unique3").unwrap(),
168            "Another"
169        );
170
171        // Cloned, then Take is OK
172        assert_eq!(
173            registry.take_unique::<String>("unique3").unwrap(),
174            "Another"
175        );
176
177        // Take, then Take again should fail
178        assert!(registry.take_unique::<String>("unique3").is_err());
179    }
180}