dynamo_runtime/
runtime.rs1use super::utils::GracefulShutdownTracker;
17use crate::{
18 compute,
19 config::{self, RuntimeConfig},
20};
21
22use futures::Future;
23use once_cell::sync::OnceCell;
24use std::{
25 mem::ManuallyDrop,
26 sync::{Arc, atomic::Ordering},
27};
28use tokio::{signal, sync::Mutex, task::JoinHandle};
29
30pub use tokio_util::sync::CancellationToken;
31
32#[derive(Clone, Debug)]
34enum RuntimeType {
35 Shared(Arc<ManuallyDrop<tokio::runtime::Runtime>>),
36 External(tokio::runtime::Handle),
37}
38
39#[derive(Debug, Clone)]
41pub struct Runtime {
42 id: Arc<String>,
43 primary: RuntimeType,
44 secondary: RuntimeType,
45 cancellation_token: CancellationToken,
46 endpoint_shutdown_token: CancellationToken,
47 graceful_shutdown_tracker: Arc<GracefulShutdownTracker>,
48 compute_pool: Option<Arc<compute::ComputePool>>,
49 block_in_place_permits: Option<Arc<tokio::sync::Semaphore>>,
50}
51
52impl Runtime {
53 fn new(runtime: RuntimeType, secondary: Option<RuntimeType>) -> anyhow::Result<Runtime> {
54 crate::nvtx::init();
56
57 let id = Arc::new(uuid::Uuid::new_v4().to_string());
59
60 let cancellation_token = CancellationToken::new();
62
63 let endpoint_shutdown_token = cancellation_token.child_token();
65
66 let secondary = match secondary {
68 Some(secondary) => secondary,
69 None => {
70 tracing::debug!("Created secondary runtime with single thread");
71 RuntimeType::Shared(Arc::new(ManuallyDrop::new(
72 RuntimeConfig::single_threaded().create_runtime()?,
73 )))
74 }
75 };
76
77 let compute_pool = None;
80 let block_in_place_permits = None;
81
82 Ok(Runtime {
83 id,
84 primary: runtime,
85 secondary,
86 cancellation_token,
87 endpoint_shutdown_token,
88 graceful_shutdown_tracker: Arc::new(GracefulShutdownTracker::new()),
89 compute_pool,
90 block_in_place_permits,
91 })
92 }
93
94 fn new_with_config(
95 runtime: RuntimeType,
96 secondary: Option<RuntimeType>,
97 config: &RuntimeConfig,
98 ) -> anyhow::Result<Runtime> {
99 let mut rt = Self::new(runtime, secondary)?;
100
101 let compute_config = crate::compute::ComputeConfig {
103 num_threads: config.compute_threads,
104 stack_size: config.compute_stack_size,
105 thread_prefix: config.compute_thread_prefix.clone(),
106 pin_threads: false,
107 };
108
109 if config.compute_threads == Some(0) {
111 tracing::info!("Compute pool disabled (compute_threads = 0)");
112 } else {
113 match crate::compute::ComputePool::new(compute_config) {
114 Ok(pool) => {
115 rt.compute_pool = Some(Arc::new(pool));
116 tracing::debug!(
117 "Initialized compute pool with {} threads",
118 rt.compute_pool.as_ref().unwrap().num_threads()
119 );
120 }
121 Err(e) => {
122 tracing::warn!(
123 "Failed to create compute pool: {}. CPU-intensive operations will use spawn_blocking",
124 e
125 );
126 }
127 }
128 }
129
130 let num_workers = config
132 .num_worker_threads
133 .unwrap_or_else(|| std::thread::available_parallelism().unwrap().get());
134 let permits = num_workers.saturating_sub(1).max(1);
136 rt.block_in_place_permits = Some(Arc::new(tokio::sync::Semaphore::new(permits)));
137 tracing::debug!(
138 "Initialized block_in_place permits: {} (from {} worker threads)",
139 permits,
140 num_workers
141 );
142
143 Ok(rt)
144 }
145
146 pub fn initialize_thread_local(&self) {
149 if let (Some(pool), Some(permits)) = (&self.compute_pool, &self.block_in_place_permits) {
150 crate::compute::thread_local::initialize_context(Arc::clone(pool), Arc::clone(permits));
151 }
152 let thread_name = std::thread::current()
154 .name()
155 .map(|n| n.to_string())
156 .unwrap_or_else(|| format!("tokio-worker-{:?}", std::thread::current().id()));
157 crate::nvtx::name_current_thread_impl(&thread_name);
158 }
159
160 pub async fn initialize_all_thread_locals(&self) -> anyhow::Result<()> {
163 if let (Some(pool), Some(permits)) = (&self.compute_pool, &self.block_in_place_permits) {
164 let num_workers = self.detect_worker_thread_count().await;
166
167 if num_workers == 0 {
168 return Err(anyhow::anyhow!("No worker threads detected"));
169 }
170
171 let barrier = Arc::new(std::sync::Barrier::new(num_workers));
173 let init_pool = Arc::clone(pool);
174 let init_permits = Arc::clone(permits);
175
176 let mut handles = Vec::new();
178 for i in 0..num_workers {
179 let barrier_clone = Arc::clone(&barrier);
180 let pool_clone = Arc::clone(&init_pool);
181 let permits_clone = Arc::clone(&init_permits);
182
183 let handle = tokio::task::spawn_blocking(move || {
184 barrier_clone.wait();
186
187 crate::compute::thread_local::initialize_context(pool_clone, permits_clone);
189
190 let thread_id = std::thread::current().id();
192 tracing::trace!(
193 "Initialized thread-local compute context on thread {:?} (worker {})",
194 thread_id,
195 i
196 );
197 });
198 handles.push(handle);
199 }
200
201 for handle in handles {
203 handle.await?;
204 }
205
206 tracing::info!(
207 "Successfully initialized thread-local compute context on {} worker threads",
208 num_workers
209 );
210 } else {
211 tracing::debug!("No compute pool configured, skipping thread-local initialization");
212 }
213 Ok(())
214 }
215
216 async fn detect_worker_thread_count(&self) -> usize {
218 use parking_lot::Mutex;
219 use std::collections::HashSet;
220
221 let thread_ids = Arc::new(Mutex::new(HashSet::new()));
222 let mut handles = Vec::new();
223
224 let num_probes = 100;
227 for _ in 0..num_probes {
228 let ids = Arc::clone(&thread_ids);
229 let handle = tokio::task::spawn_blocking(move || {
230 let thread_id = std::thread::current().id();
231 ids.lock().insert(thread_id);
232 });
233 handles.push(handle);
234 }
235
236 for handle in handles {
238 let _ = handle.await;
239 }
240
241 let count = thread_ids.lock().len();
242 tracing::debug!("Detected {count} worker threads in runtime");
243 count
244 }
245
246 pub fn from_current() -> anyhow::Result<Runtime> {
247 Runtime::from_handle(tokio::runtime::Handle::current())
248 }
249
250 pub fn from_handle(handle: tokio::runtime::Handle) -> anyhow::Result<Runtime> {
251 let primary = RuntimeType::External(handle.clone());
252 let secondary = RuntimeType::External(handle);
253 Runtime::new(primary, Some(secondary))
254 }
255
256 pub fn from_settings() -> anyhow::Result<Runtime> {
259 let config = config::RuntimeConfig::from_settings()?;
260 let runtime = Arc::new(ManuallyDrop::new(config.create_runtime()?));
261 let primary = RuntimeType::Shared(runtime.clone());
262 let secondary = RuntimeType::External(runtime.handle().clone());
263 Runtime::new_with_config(primary, Some(secondary), &config)
264 }
265
266 pub fn single_threaded() -> anyhow::Result<Runtime> {
268 let config = config::RuntimeConfig::single_threaded();
269 let owned = RuntimeType::Shared(Arc::new(ManuallyDrop::new(config.create_runtime()?)));
270 Runtime::new(owned, None)
271 }
272
273 pub fn id(&self) -> &str {
275 &self.id
276 }
277
278 pub fn primary(&self) -> tokio::runtime::Handle {
280 self.primary.handle()
281 }
282
283 pub fn secondary(&self) -> tokio::runtime::Handle {
285 self.secondary.handle()
286 }
287
288 pub fn primary_token(&self) -> CancellationToken {
290 self.cancellation_token.clone()
291 }
292
293 pub fn child_token(&self) -> CancellationToken {
295 self.endpoint_shutdown_token.child_token()
296 }
297
298 pub(crate) fn graceful_shutdown_tracker(&self) -> Arc<GracefulShutdownTracker> {
300 self.graceful_shutdown_tracker.clone()
301 }
302
303 pub fn compute_pool(&self) -> Option<&Arc<crate::compute::ComputePool>> {
307 self.compute_pool.as_ref()
308 }
309
310 pub fn shutdown(&self) {
312 tracing::info!("Runtime shutdown initiated");
313
314 let tracker = self.graceful_shutdown_tracker.clone();
316 let main_token = self.cancellation_token.clone();
317 let endpoint_token = self.endpoint_shutdown_token.clone();
318
319 let handle = self.primary();
321 handle.spawn(async move {
322 tracing::info!("Phase 1: Cancelling endpoint shutdown token");
324 endpoint_token.cancel();
325
326 tracing::info!("Phase 2: Waiting for graceful endpoints to complete");
328
329 let count = tracker.get_count();
330 tracing::info!("Active graceful endpoints: {count}");
331
332 if count != 0 {
333 tracker.wait_for_completion().await;
334 }
335
336 tracing::info!(
338 "Phase 3: All endpoints ended gracefully. Connections to backend services will now be disconnected"
339 );
340 main_token.cancel();
341 });
342 }
343}
344
345impl RuntimeType {
346 pub fn handle(&self) -> tokio::runtime::Handle {
348 match self {
349 RuntimeType::External(rt) => rt.clone(),
350 RuntimeType::Shared(rt) => rt.handle().clone(),
351 }
352 }
353}
354
355impl Drop for RuntimeType {
368 fn drop(&mut self) {
369 match self {
370 RuntimeType::External(_) => {}
371 RuntimeType::Shared(arc) => {
372 let Some(md_runtime) = Arc::get_mut(arc) else {
373 return;
376 };
377 if tokio::runtime::Handle::try_current().is_ok() {
378 let tokio_runtime = unsafe { ManuallyDrop::take(md_runtime) };
380 tokio_runtime.shutdown_background();
381 } else {
382 unsafe { ManuallyDrop::drop(md_runtime) };
389 }
390 }
391 }
392 }
393}