dynamo_runtime/
runtime.rs1use super::utils::GracefulShutdownTracker;
17use crate::{
18 compute,
19 config::{self, RuntimeConfig},
20};
21
22use futures::Future;
23use once_cell::sync::OnceCell;
24use std::{
25 mem::ManuallyDrop,
26 sync::{Arc, atomic::Ordering},
27};
28use tokio::{signal, sync::Mutex, task::JoinHandle};
29
30pub use tokio_util::sync::CancellationToken;
31
32#[derive(Clone, Debug)]
34enum RuntimeType {
35 Shared(Arc<ManuallyDrop<tokio::runtime::Runtime>>),
36 External(tokio::runtime::Handle),
37}
38
39#[derive(Debug, Clone)]
41pub struct Runtime {
42 id: Arc<String>,
43 primary: RuntimeType,
44 secondary: RuntimeType,
45 cancellation_token: CancellationToken,
46 endpoint_shutdown_token: CancellationToken,
47 graceful_shutdown_tracker: Arc<GracefulShutdownTracker>,
48 compute_pool: Option<Arc<compute::ComputePool>>,
49 block_in_place_permits: Option<Arc<tokio::sync::Semaphore>>,
50}
51
52impl Runtime {
53 fn new(runtime: RuntimeType, secondary: Option<RuntimeType>) -> anyhow::Result<Runtime> {
54 let id = Arc::new(uuid::Uuid::new_v4().to_string());
56
57 let cancellation_token = CancellationToken::new();
59
60 let endpoint_shutdown_token = cancellation_token.child_token();
62
63 let secondary = match secondary {
65 Some(secondary) => secondary,
66 None => {
67 tracing::debug!("Created secondary runtime with single thread");
68 RuntimeType::Shared(Arc::new(ManuallyDrop::new(
69 RuntimeConfig::single_threaded().create_runtime()?,
70 )))
71 }
72 };
73
74 let compute_pool = None;
77 let block_in_place_permits = None;
78
79 Ok(Runtime {
80 id,
81 primary: runtime,
82 secondary,
83 cancellation_token,
84 endpoint_shutdown_token,
85 graceful_shutdown_tracker: Arc::new(GracefulShutdownTracker::new()),
86 compute_pool,
87 block_in_place_permits,
88 })
89 }
90
91 fn new_with_config(
92 runtime: RuntimeType,
93 secondary: Option<RuntimeType>,
94 config: &RuntimeConfig,
95 ) -> anyhow::Result<Runtime> {
96 let mut rt = Self::new(runtime, secondary)?;
97
98 let compute_config = crate::compute::ComputeConfig {
100 num_threads: config.compute_threads,
101 stack_size: config.compute_stack_size,
102 thread_prefix: config.compute_thread_prefix.clone(),
103 pin_threads: false,
104 };
105
106 if config.compute_threads == Some(0) {
108 tracing::info!("Compute pool disabled (compute_threads = 0)");
109 } else {
110 match crate::compute::ComputePool::new(compute_config) {
111 Ok(pool) => {
112 rt.compute_pool = Some(Arc::new(pool));
113 tracing::debug!(
114 "Initialized compute pool with {} threads",
115 rt.compute_pool.as_ref().unwrap().num_threads()
116 );
117 }
118 Err(e) => {
119 tracing::warn!(
120 "Failed to create compute pool: {}. CPU-intensive operations will use spawn_blocking",
121 e
122 );
123 }
124 }
125 }
126
127 let num_workers = config
129 .num_worker_threads
130 .unwrap_or_else(|| std::thread::available_parallelism().unwrap().get());
131 let permits = num_workers.saturating_sub(1).max(1);
133 rt.block_in_place_permits = Some(Arc::new(tokio::sync::Semaphore::new(permits)));
134 tracing::debug!(
135 "Initialized block_in_place permits: {} (from {} worker threads)",
136 permits,
137 num_workers
138 );
139
140 Ok(rt)
141 }
142
143 pub fn initialize_thread_local(&self) {
146 if let (Some(pool), Some(permits)) = (&self.compute_pool, &self.block_in_place_permits) {
147 crate::compute::thread_local::initialize_context(Arc::clone(pool), Arc::clone(permits));
148 }
149 }
150
151 pub async fn initialize_all_thread_locals(&self) -> anyhow::Result<()> {
154 if let (Some(pool), Some(permits)) = (&self.compute_pool, &self.block_in_place_permits) {
155 let num_workers = self.detect_worker_thread_count().await;
157
158 if num_workers == 0 {
159 return Err(anyhow::anyhow!("No worker threads detected"));
160 }
161
162 let barrier = Arc::new(std::sync::Barrier::new(num_workers));
164 let init_pool = Arc::clone(pool);
165 let init_permits = Arc::clone(permits);
166
167 let mut handles = Vec::new();
169 for i in 0..num_workers {
170 let barrier_clone = Arc::clone(&barrier);
171 let pool_clone = Arc::clone(&init_pool);
172 let permits_clone = Arc::clone(&init_permits);
173
174 let handle = tokio::task::spawn_blocking(move || {
175 barrier_clone.wait();
177
178 crate::compute::thread_local::initialize_context(pool_clone, permits_clone);
180
181 let thread_id = std::thread::current().id();
183 tracing::trace!(
184 "Initialized thread-local compute context on thread {:?} (worker {})",
185 thread_id,
186 i
187 );
188 });
189 handles.push(handle);
190 }
191
192 for handle in handles {
194 handle.await?;
195 }
196
197 tracing::info!(
198 "Successfully initialized thread-local compute context on {} worker threads",
199 num_workers
200 );
201 } else {
202 tracing::debug!("No compute pool configured, skipping thread-local initialization");
203 }
204 Ok(())
205 }
206
207 async fn detect_worker_thread_count(&self) -> usize {
209 use parking_lot::Mutex;
210 use std::collections::HashSet;
211
212 let thread_ids = Arc::new(Mutex::new(HashSet::new()));
213 let mut handles = Vec::new();
214
215 let num_probes = 100;
218 for _ in 0..num_probes {
219 let ids = Arc::clone(&thread_ids);
220 let handle = tokio::task::spawn_blocking(move || {
221 let thread_id = std::thread::current().id();
222 ids.lock().insert(thread_id);
223 });
224 handles.push(handle);
225 }
226
227 for handle in handles {
229 let _ = handle.await;
230 }
231
232 let count = thread_ids.lock().len();
233 tracing::debug!("Detected {} worker threads in runtime", count);
234 count
235 }
236
237 pub fn from_current() -> anyhow::Result<Runtime> {
238 Runtime::from_handle(tokio::runtime::Handle::current())
239 }
240
241 pub fn from_handle(handle: tokio::runtime::Handle) -> anyhow::Result<Runtime> {
242 let primary = RuntimeType::External(handle.clone());
243 let secondary = RuntimeType::External(handle);
244 Runtime::new(primary, Some(secondary))
245 }
246
247 pub fn from_settings() -> anyhow::Result<Runtime> {
250 let config = config::RuntimeConfig::from_settings()?;
251 let runtime = Arc::new(ManuallyDrop::new(config.create_runtime()?));
252 let primary = RuntimeType::Shared(runtime.clone());
253 let secondary = RuntimeType::External(runtime.handle().clone());
254 Runtime::new_with_config(primary, Some(secondary), &config)
255 }
256
257 pub fn single_threaded() -> anyhow::Result<Runtime> {
259 let config = config::RuntimeConfig::single_threaded();
260 let owned = RuntimeType::Shared(Arc::new(ManuallyDrop::new(config.create_runtime()?)));
261 Runtime::new(owned, None)
262 }
263
264 pub fn id(&self) -> &str {
266 &self.id
267 }
268
269 pub fn primary(&self) -> tokio::runtime::Handle {
271 self.primary.handle()
272 }
273
274 pub fn secondary(&self) -> tokio::runtime::Handle {
276 self.secondary.handle()
277 }
278
279 pub fn primary_token(&self) -> CancellationToken {
281 self.cancellation_token.clone()
282 }
283
284 pub fn child_token(&self) -> CancellationToken {
286 self.endpoint_shutdown_token.child_token()
287 }
288
289 pub(crate) fn graceful_shutdown_tracker(&self) -> Arc<GracefulShutdownTracker> {
291 self.graceful_shutdown_tracker.clone()
292 }
293
294 pub fn compute_pool(&self) -> Option<&Arc<crate::compute::ComputePool>> {
298 self.compute_pool.as_ref()
299 }
300
301 pub fn shutdown(&self) {
303 tracing::info!("Runtime shutdown initiated");
304
305 let tracker = self.graceful_shutdown_tracker.clone();
307 let main_token = self.cancellation_token.clone();
308 let endpoint_token = self.endpoint_shutdown_token.clone();
309
310 let handle = self.primary();
312 handle.spawn(async move {
313 tracing::info!("Phase 1: Cancelling endpoint shutdown token");
315 endpoint_token.cancel();
316
317 tracing::info!("Phase 2: Waiting for graceful endpoints to complete");
319
320 let count = tracker.get_count();
321 tracing::info!("Active graceful endpoints: {}", count);
322
323 if count != 0 {
324 tracker.wait_for_completion().await;
325 }
326
327 tracing::info!(
329 "Phase 3: All endpoints ended gracefully. Connections to backend services will now be disconnected"
330 );
331 main_token.cancel();
332 });
333 }
334}
335
336impl RuntimeType {
337 pub fn handle(&self) -> tokio::runtime::Handle {
339 match self {
340 RuntimeType::External(rt) => rt.clone(),
341 RuntimeType::Shared(rt) => rt.handle().clone(),
342 }
343 }
344}
345
346impl Drop for RuntimeType {
359 fn drop(&mut self) {
360 match self {
361 RuntimeType::External(_) => {}
362 RuntimeType::Shared(arc) => {
363 let Some(md_runtime) = Arc::get_mut(arc) else {
364 return;
367 };
368 if tokio::runtime::Handle::try_current().is_ok() {
369 let tokio_runtime = unsafe { ManuallyDrop::take(md_runtime) };
371 tokio_runtime.shutdown_background();
372 } else {
373 unsafe { ManuallyDrop::drop(md_runtime) };
380 }
381 }
382 }
383 }
384}