dynamo_runtime/
runtime.rs1use super::utils::GracefulShutdownTracker;
17use super::{Result, Runtime, RuntimeType, error};
18use crate::config::{self, RuntimeConfig};
19
20use futures::Future;
21use once_cell::sync::OnceCell;
22use std::sync::{Arc, atomic::Ordering};
23use tokio::{signal, sync::Mutex, task::JoinHandle};
24
25pub use tokio_util::sync::CancellationToken;
26
27impl Runtime {
28 fn new(runtime: RuntimeType, secondary: Option<RuntimeType>) -> Result<Runtime> {
29 let id = Arc::new(uuid::Uuid::new_v4().to_string());
31
32 let cancellation_token = CancellationToken::new();
34
35 let endpoint_shutdown_token = cancellation_token.child_token();
37
38 let secondary = match secondary {
40 Some(secondary) => secondary,
41 None => {
42 tracing::debug!("Created secondary runtime with single thread");
43 RuntimeType::Shared(Arc::new(RuntimeConfig::single_threaded().create_runtime()?))
44 }
45 };
46
47 let compute_pool = None;
50 let block_in_place_permits = None;
51
52 Ok(Runtime {
53 id,
54 primary: runtime,
55 secondary,
56 cancellation_token,
57 endpoint_shutdown_token,
58 graceful_shutdown_tracker: Arc::new(GracefulShutdownTracker::new()),
59 compute_pool,
60 block_in_place_permits,
61 })
62 }
63
64 fn new_with_config(
65 runtime: RuntimeType,
66 secondary: Option<RuntimeType>,
67 config: &RuntimeConfig,
68 ) -> Result<Runtime> {
69 let mut rt = Self::new(runtime, secondary)?;
70
71 let compute_config = crate::compute::ComputeConfig {
73 num_threads: config.compute_threads,
74 stack_size: config.compute_stack_size,
75 thread_prefix: config.compute_thread_prefix.clone(),
76 pin_threads: false,
77 };
78
79 if config.compute_threads == Some(0) {
81 tracing::info!("Compute pool disabled (compute_threads = 0)");
82 } else {
83 match crate::compute::ComputePool::new(compute_config) {
84 Ok(pool) => {
85 rt.compute_pool = Some(Arc::new(pool));
86 tracing::debug!(
87 "Initialized compute pool with {} threads",
88 rt.compute_pool.as_ref().unwrap().num_threads()
89 );
90 }
91 Err(e) => {
92 tracing::warn!(
93 "Failed to create compute pool: {}. CPU-intensive operations will use spawn_blocking",
94 e
95 );
96 }
97 }
98 }
99
100 let num_workers = config
102 .num_worker_threads
103 .unwrap_or_else(|| std::thread::available_parallelism().unwrap().get());
104 let permits = num_workers.saturating_sub(1).max(1);
106 rt.block_in_place_permits = Some(Arc::new(tokio::sync::Semaphore::new(permits)));
107 tracing::debug!(
108 "Initialized block_in_place permits: {} (from {} worker threads)",
109 permits,
110 num_workers
111 );
112
113 Ok(rt)
114 }
115
116 pub fn initialize_thread_local(&self) {
119 if let (Some(pool), Some(permits)) = (&self.compute_pool, &self.block_in_place_permits) {
120 crate::compute::thread_local::initialize_context(Arc::clone(pool), Arc::clone(permits));
121 }
122 }
123
124 pub async fn initialize_all_thread_locals(&self) -> Result<()> {
127 if let (Some(pool), Some(permits)) = (&self.compute_pool, &self.block_in_place_permits) {
128 let num_workers = self.detect_worker_thread_count().await;
130
131 if num_workers == 0 {
132 return Err(anyhow::anyhow!("No worker threads detected"));
133 }
134
135 let barrier = Arc::new(std::sync::Barrier::new(num_workers));
137 let init_pool = Arc::clone(pool);
138 let init_permits = Arc::clone(permits);
139
140 let mut handles = Vec::new();
142 for i in 0..num_workers {
143 let barrier_clone = Arc::clone(&barrier);
144 let pool_clone = Arc::clone(&init_pool);
145 let permits_clone = Arc::clone(&init_permits);
146
147 let handle = tokio::task::spawn_blocking(move || {
148 barrier_clone.wait();
150
151 crate::compute::thread_local::initialize_context(pool_clone, permits_clone);
153
154 let thread_id = std::thread::current().id();
156 tracing::trace!(
157 "Initialized thread-local compute context on thread {:?} (worker {})",
158 thread_id,
159 i
160 );
161 });
162 handles.push(handle);
163 }
164
165 for handle in handles {
167 handle.await?;
168 }
169
170 tracing::info!(
171 "Successfully initialized thread-local compute context on {} worker threads",
172 num_workers
173 );
174 } else {
175 tracing::debug!("No compute pool configured, skipping thread-local initialization");
176 }
177 Ok(())
178 }
179
180 async fn detect_worker_thread_count(&self) -> usize {
182 use std::collections::HashSet;
183 use std::sync::Mutex;
184
185 let thread_ids = Arc::new(Mutex::new(HashSet::new()));
186 let mut handles = Vec::new();
187
188 let num_probes = 100;
191 for _ in 0..num_probes {
192 let ids = Arc::clone(&thread_ids);
193 let handle = tokio::task::spawn_blocking(move || {
194 let thread_id = std::thread::current().id();
195 ids.lock().unwrap().insert(thread_id);
196 });
197 handles.push(handle);
198 }
199
200 for handle in handles {
202 let _ = handle.await;
203 }
204
205 let count = thread_ids.lock().unwrap().len();
206 tracing::debug!("Detected {} worker threads in runtime", count);
207 count
208 }
209
210 pub fn from_current() -> Result<Runtime> {
211 Runtime::from_handle(tokio::runtime::Handle::current())
212 }
213
214 pub fn from_handle(handle: tokio::runtime::Handle) -> Result<Runtime> {
215 let primary = RuntimeType::External(handle.clone());
216 let secondary = RuntimeType::External(handle);
217 Runtime::new(primary, Some(secondary))
218 }
219
220 pub fn from_settings() -> Result<Runtime> {
223 let config = config::RuntimeConfig::from_settings()?;
224 let runtime = Arc::new(config.create_runtime()?);
225 let primary = RuntimeType::Shared(runtime.clone());
226 let secondary = RuntimeType::External(runtime.handle().clone());
227 Runtime::new_with_config(primary, Some(secondary), &config)
228 }
229
230 pub fn single_threaded() -> Result<Runtime> {
232 let config = config::RuntimeConfig::single_threaded();
233 let owned = RuntimeType::Shared(Arc::new(config.create_runtime()?));
234 Runtime::new(owned, None)
235 }
236
237 pub fn id(&self) -> &str {
239 &self.id
240 }
241
242 pub fn primary(&self) -> tokio::runtime::Handle {
244 self.primary.handle()
245 }
246
247 pub fn secondary(&self) -> tokio::runtime::Handle {
249 self.secondary.handle()
250 }
251
252 pub fn primary_token(&self) -> CancellationToken {
254 self.cancellation_token.clone()
255 }
256
257 pub fn child_token(&self) -> CancellationToken {
259 self.endpoint_shutdown_token.child_token()
260 }
261
262 pub(crate) fn graceful_shutdown_tracker(&self) -> Arc<GracefulShutdownTracker> {
264 self.graceful_shutdown_tracker.clone()
265 }
266
267 pub fn compute_pool(&self) -> Option<&Arc<crate::compute::ComputePool>> {
271 self.compute_pool.as_ref()
272 }
273
274 pub fn shutdown(&self) {
276 tracing::info!("Runtime shutdown initiated");
277
278 let tracker = self.graceful_shutdown_tracker.clone();
280 let main_token = self.cancellation_token.clone();
281 let endpoint_token = self.endpoint_shutdown_token.clone();
282
283 let handle = self.primary();
285 handle.spawn(async move {
286 tracing::info!("Phase 1: Cancelling endpoint shutdown token");
288 endpoint_token.cancel();
289
290 tracing::info!("Phase 2: Waiting for graceful endpoints to complete");
292
293 let count = tracker.get_count();
294 tracing::info!("Active graceful endpoints: {}", count);
295
296 if count != 0 {
297 tracker.wait_for_completion().await;
298 }
299
300 tracing::info!(
302 "Phase 3: All graceful endpoints completed, shutting down NATS/ETCD connections"
303 );
304 main_token.cancel();
305 });
306 }
307}
308
309impl RuntimeType {
310 pub fn handle(&self) -> tokio::runtime::Handle {
312 match self {
313 RuntimeType::External(rt) => rt.clone(),
314 RuntimeType::Shared(rt) => rt.handle().clone(),
315 }
316 }
317}
318
319impl std::fmt::Debug for RuntimeType {
320 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
321 match self {
322 RuntimeType::External(_) => write!(f, "RuntimeType::External"),
323 RuntimeType::Shared(_) => write!(f, "RuntimeType::Shared"),
324 }
325 }
326}