Skip to main content

dynamo_runtime/
runtime.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! The [Runtime] module is the interface for [crate::component::Component]
5//! to access shared resources. These include thread pool, memory allocators and other shared resources.
6//!
7//! The [Runtime] holds the primary [`CancellationToken`] which can be used to terminate all attached
8//! [`crate::component::Component`].
9//!
10//! We expect in the future to offer topologically aware thread and memory resources, but for now the
11//! set of resources is limited to the thread pool and cancellation token.
12//!
13//! Notes: We will need to do an evaluation on what is fully public, what is pub(crate) and what is
14//! private; however, for now we are exposing most objects as fully public while the API is maturing.
15
16use super::utils::GracefulShutdownTracker;
17use crate::{
18    compute,
19    config::{self, RuntimeConfig},
20};
21
22use futures::Future;
23use once_cell::sync::OnceCell;
24use std::{
25    mem::ManuallyDrop,
26    sync::{Arc, atomic::Ordering},
27};
28use tokio::{signal, sync::Mutex, task::JoinHandle};
29
30pub use tokio_util::sync::CancellationToken;
31
32/// Types of Tokio runtimes that can be used to construct a Dynamo [Runtime].
33#[derive(Clone, Debug)]
34enum RuntimeType {
35    Shared(Arc<ManuallyDrop<tokio::runtime::Runtime>>),
36    External(tokio::runtime::Handle),
37}
38
39/// Local [Runtime] which provides access to shared resources local to the physical node/machine.
40#[derive(Debug, Clone)]
41pub struct Runtime {
42    id: Arc<String>,
43    primary: RuntimeType,
44    secondary: RuntimeType,
45    cancellation_token: CancellationToken,
46    endpoint_shutdown_token: CancellationToken,
47    graceful_shutdown_tracker: Arc<GracefulShutdownTracker>,
48    compute_pool: Option<Arc<compute::ComputePool>>,
49    block_in_place_permits: Option<Arc<tokio::sync::Semaphore>>,
50}
51
52impl Runtime {
53    fn new(runtime: RuntimeType, secondary: Option<RuntimeType>) -> anyhow::Result<Runtime> {
54        // worker id
55        let id = Arc::new(uuid::Uuid::new_v4().to_string());
56
57        // create a cancellation token
58        let cancellation_token = CancellationToken::new();
59
60        // create endpoint shutdown token as a child of the main token
61        let endpoint_shutdown_token = cancellation_token.child_token();
62
63        // secondary runtime for background ectd/nats tasks
64        let secondary = match secondary {
65            Some(secondary) => secondary,
66            None => {
67                tracing::debug!("Created secondary runtime with single thread");
68                RuntimeType::Shared(Arc::new(ManuallyDrop::new(
69                    RuntimeConfig::single_threaded().create_runtime()?,
70                )))
71            }
72        };
73
74        // Initialize compute pool with default config
75        // This will be properly configured when created from RuntimeConfig
76        let compute_pool = None;
77        let block_in_place_permits = None;
78
79        Ok(Runtime {
80            id,
81            primary: runtime,
82            secondary,
83            cancellation_token,
84            endpoint_shutdown_token,
85            graceful_shutdown_tracker: Arc::new(GracefulShutdownTracker::new()),
86            compute_pool,
87            block_in_place_permits,
88        })
89    }
90
91    fn new_with_config(
92        runtime: RuntimeType,
93        secondary: Option<RuntimeType>,
94        config: &RuntimeConfig,
95    ) -> anyhow::Result<Runtime> {
96        let mut rt = Self::new(runtime, secondary)?;
97
98        // Create compute pool from configuration
99        let compute_config = crate::compute::ComputeConfig {
100            num_threads: config.compute_threads,
101            stack_size: config.compute_stack_size,
102            thread_prefix: config.compute_thread_prefix.clone(),
103            pin_threads: false,
104        };
105
106        // Check if compute pool is explicitly disabled
107        if config.compute_threads == Some(0) {
108            tracing::info!("Compute pool disabled (compute_threads = 0)");
109        } else {
110            match crate::compute::ComputePool::new(compute_config) {
111                Ok(pool) => {
112                    rt.compute_pool = Some(Arc::new(pool));
113                    tracing::debug!(
114                        "Initialized compute pool with {} threads",
115                        rt.compute_pool.as_ref().unwrap().num_threads()
116                    );
117                }
118                Err(e) => {
119                    tracing::warn!(
120                        "Failed to create compute pool: {}. CPU-intensive operations will use spawn_blocking",
121                        e
122                    );
123                }
124            }
125        }
126
127        // Initialize block_in_place semaphore based on actual worker threads
128        let num_workers = config
129            .num_worker_threads
130            .unwrap_or_else(|| std::thread::available_parallelism().unwrap().get());
131        // Reserve at least one thread for async work
132        let permits = num_workers.saturating_sub(1).max(1);
133        rt.block_in_place_permits = Some(Arc::new(tokio::sync::Semaphore::new(permits)));
134        tracing::debug!(
135            "Initialized block_in_place permits: {} (from {} worker threads)",
136            permits,
137            num_workers
138        );
139
140        Ok(rt)
141    }
142
143    /// Initialize thread-local compute context on the current thread
144    /// This should be called on each Tokio worker thread
145    pub fn initialize_thread_local(&self) {
146        if let (Some(pool), Some(permits)) = (&self.compute_pool, &self.block_in_place_permits) {
147            crate::compute::thread_local::initialize_context(Arc::clone(pool), Arc::clone(permits));
148        }
149    }
150
151    /// Initialize thread-local compute context on all worker threads using a barrier
152    /// This ensures every worker thread has its thread-local context initialized
153    pub async fn initialize_all_thread_locals(&self) -> anyhow::Result<()> {
154        if let (Some(pool), Some(permits)) = (&self.compute_pool, &self.block_in_place_permits) {
155            // First, detect how many worker threads we actually have
156            let num_workers = self.detect_worker_thread_count().await;
157
158            if num_workers == 0 {
159                return Err(anyhow::anyhow!("No worker threads detected"));
160            }
161
162            // Create a barrier that all threads must reach
163            let barrier = Arc::new(std::sync::Barrier::new(num_workers));
164            let init_pool = Arc::clone(pool);
165            let init_permits = Arc::clone(permits);
166
167            // Spawn exactly one blocking task per worker thread
168            let mut handles = Vec::new();
169            for i in 0..num_workers {
170                let barrier_clone = Arc::clone(&barrier);
171                let pool_clone = Arc::clone(&init_pool);
172                let permits_clone = Arc::clone(&init_permits);
173
174                let handle = tokio::task::spawn_blocking(move || {
175                    // Wait at barrier - ensures all threads are participating
176                    barrier_clone.wait();
177
178                    // Now initialize thread-local storage
179                    crate::compute::thread_local::initialize_context(pool_clone, permits_clone);
180
181                    // Get thread ID for logging
182                    let thread_id = std::thread::current().id();
183                    tracing::trace!(
184                        "Initialized thread-local compute context on thread {:?} (worker {})",
185                        thread_id,
186                        i
187                    );
188                });
189                handles.push(handle);
190            }
191
192            // Wait for all tasks to complete
193            for handle in handles {
194                handle.await?;
195            }
196
197            tracing::info!(
198                "Successfully initialized thread-local compute context on {} worker threads",
199                num_workers
200            );
201        } else {
202            tracing::debug!("No compute pool configured, skipping thread-local initialization");
203        }
204        Ok(())
205    }
206
207    /// Detect the number of worker threads in the runtime
208    async fn detect_worker_thread_count(&self) -> usize {
209        use parking_lot::Mutex;
210        use std::collections::HashSet;
211
212        let thread_ids = Arc::new(Mutex::new(HashSet::new()));
213        let mut handles = Vec::new();
214
215        // Spawn many blocking tasks to ensure we hit all threads
216        // We use spawn_blocking because it runs on worker threads
217        let num_probes = 100;
218        for _ in 0..num_probes {
219            let ids = Arc::clone(&thread_ids);
220            let handle = tokio::task::spawn_blocking(move || {
221                let thread_id = std::thread::current().id();
222                ids.lock().insert(thread_id);
223            });
224            handles.push(handle);
225        }
226
227        // Wait for all probes to complete
228        for handle in handles {
229            let _ = handle.await;
230        }
231
232        let count = thread_ids.lock().len();
233        tracing::debug!("Detected {} worker threads in runtime", count);
234        count
235    }
236
237    pub fn from_current() -> anyhow::Result<Runtime> {
238        Runtime::from_handle(tokio::runtime::Handle::current())
239    }
240
241    pub fn from_handle(handle: tokio::runtime::Handle) -> anyhow::Result<Runtime> {
242        let primary = RuntimeType::External(handle.clone());
243        let secondary = RuntimeType::External(handle);
244        Runtime::new(primary, Some(secondary))
245    }
246
247    /// Create a [`Runtime`] instance from the settings
248    /// See [`config::RuntimeConfig::from_settings`]
249    pub fn from_settings() -> anyhow::Result<Runtime> {
250        let config = config::RuntimeConfig::from_settings()?;
251        let runtime = Arc::new(ManuallyDrop::new(config.create_runtime()?));
252        let primary = RuntimeType::Shared(runtime.clone());
253        let secondary = RuntimeType::External(runtime.handle().clone());
254        Runtime::new_with_config(primary, Some(secondary), &config)
255    }
256
257    /// Create a [`Runtime`] with two single-threaded async tokio runtime
258    pub fn single_threaded() -> anyhow::Result<Runtime> {
259        let config = config::RuntimeConfig::single_threaded();
260        let owned = RuntimeType::Shared(Arc::new(ManuallyDrop::new(config.create_runtime()?)));
261        Runtime::new(owned, None)
262    }
263
264    /// Returns the unique identifier for the [`Runtime`]
265    pub fn id(&self) -> &str {
266        &self.id
267    }
268
269    /// Returns a [`tokio::runtime::Handle`] for the primary/application thread pool
270    pub fn primary(&self) -> tokio::runtime::Handle {
271        self.primary.handle()
272    }
273
274    /// Returns a [`tokio::runtime::Handle`] for the secondary/background thread pool
275    pub fn secondary(&self) -> tokio::runtime::Handle {
276        self.secondary.handle()
277    }
278
279    /// Access the primary [`CancellationToken`] for the [`Runtime`]
280    pub fn primary_token(&self) -> CancellationToken {
281        self.cancellation_token.clone()
282    }
283
284    /// Creates a child [`CancellationToken`] tied to the life-cycle of the [`Runtime`]'s endpoint shutdown token.
285    pub fn child_token(&self) -> CancellationToken {
286        self.endpoint_shutdown_token.child_token()
287    }
288
289    /// Get access to the graceful shutdown tracker
290    pub(crate) fn graceful_shutdown_tracker(&self) -> Arc<GracefulShutdownTracker> {
291        self.graceful_shutdown_tracker.clone()
292    }
293
294    /// Get access to the compute pool for CPU-intensive operations
295    ///
296    /// Returns None if the compute pool was not initialized (e.g., due to configuration error)
297    pub fn compute_pool(&self) -> Option<&Arc<crate::compute::ComputePool>> {
298        self.compute_pool.as_ref()
299    }
300
301    /// Shuts down the [`Runtime`] instance
302    pub fn shutdown(&self) {
303        tracing::info!("Runtime shutdown initiated");
304
305        // Spawn the shutdown coordination task BEFORE cancelling tokens
306        let tracker = self.graceful_shutdown_tracker.clone();
307        let main_token = self.cancellation_token.clone();
308        let endpoint_token = self.endpoint_shutdown_token.clone();
309
310        // Use the runtime handle to spawn the task
311        let handle = self.primary();
312        handle.spawn(async move {
313            // Phase 1: Cancel endpoint shutdown token to stop accepting new requests
314            tracing::info!("Phase 1: Cancelling endpoint shutdown token");
315            endpoint_token.cancel();
316
317            // Phase 2: Wait for all graceful endpoints to complete
318            tracing::info!("Phase 2: Waiting for graceful endpoints to complete");
319
320            let count = tracker.get_count();
321            tracing::info!("Active graceful endpoints: {}", count);
322
323            if count != 0 {
324                tracker.wait_for_completion().await;
325            }
326
327            // Phase 3: Now connections will be disconnected to backend services (e.g. NATS/ETCD) by cancelling the main token
328            tracing::info!(
329                "Phase 3: All endpoints ended gracefully. Connections to backend services will now be disconnected"
330            );
331            main_token.cancel();
332        });
333    }
334}
335
336impl RuntimeType {
337    /// Get [`tokio::runtime::Handle`] to runtime
338    pub fn handle(&self) -> tokio::runtime::Handle {
339        match self {
340            RuntimeType::External(rt) => rt.clone(),
341            RuntimeType::Shared(rt) => rt.handle().clone(),
342        }
343    }
344}
345
346/// Handle dropping a tokio runtime from an async context.
347///
348/// When used from the Python bindings the runtime will be dropped from (I think) Python's asyncio.
349/// Tokio does not allow this and will panic. That panic prevents logging from printing it's last
350/// messages, which makes knowing what went wrong very difficult.
351///
352/// This is the panic:
353/// > pyo3_runtime.PanicException: Cannot drop a runtime in a context where blocking is not allowed.
354/// > This happens when a runtime is dropped from within an asynchronous context.
355///
356/// Hence we wrap the runtime in a ManuallyDrop and use tokio's alternative shutdown if we detect
357/// that we are inside an async runtime.
358impl Drop for RuntimeType {
359    fn drop(&mut self) {
360        match self {
361            RuntimeType::External(_) => {}
362            RuntimeType::Shared(arc) => {
363                let Some(md_runtime) = Arc::get_mut(arc) else {
364                    // Only drop if we are the only owner of the shared pointer, meaning
365                    // one strong count and no weak count.
366                    return;
367                };
368                if tokio::runtime::Handle::try_current().is_ok() {
369                    // We are inside an async runtime.
370                    let tokio_runtime = unsafe { ManuallyDrop::take(md_runtime) };
371                    tokio_runtime.shutdown_background();
372                } else {
373                    // We are not inside an async context, dropping the runtime is safe.
374                    //
375                    // We never reach this case. I'm not sure why, something about the interaction
376                    // with pyo3 and Python lifetimes.
377                    //
378                    // Process is gone so doesn't really matter, but TODO now that we realize it.
379                    unsafe { ManuallyDrop::drop(md_runtime) };
380                }
381            }
382        }
383    }
384}