dynamo_runtime/
runtime.rs1use super::utils::GracefulShutdownTracker;
17use super::{Result, Runtime, RuntimeType, error};
18use crate::config::{self, RuntimeConfig};
19
20use futures::Future;
21use once_cell::sync::OnceCell;
22use std::sync::{Arc, atomic::Ordering};
23use tokio::{signal, sync::Mutex, task::JoinHandle};
24
25pub use tokio_util::sync::CancellationToken;
26
27impl Runtime {
28    fn new(runtime: RuntimeType, secondary: Option<RuntimeType>) -> Result<Runtime> {
29        let id = Arc::new(uuid::Uuid::new_v4().to_string());
31
32        let cancellation_token = CancellationToken::new();
34
35        let endpoint_shutdown_token = cancellation_token.child_token();
37
38        let secondary = match secondary {
40            Some(secondary) => secondary,
41            None => {
42                tracing::debug!("Created secondary runtime with single thread");
43                RuntimeType::Shared(Arc::new(RuntimeConfig::single_threaded().create_runtime()?))
44            }
45        };
46
47        Ok(Runtime {
48            id,
49            primary: runtime,
50            secondary,
51            cancellation_token,
52            endpoint_shutdown_token,
53            graceful_shutdown_tracker: Arc::new(GracefulShutdownTracker::new()),
54        })
55    }
56
57    pub fn from_current() -> Result<Runtime> {
58        Runtime::from_handle(tokio::runtime::Handle::current())
59    }
60
61    pub fn from_handle(handle: tokio::runtime::Handle) -> Result<Runtime> {
62        let primary = RuntimeType::External(handle.clone());
63        let secondary = RuntimeType::External(handle);
64        Runtime::new(primary, Some(secondary))
65    }
66
67    pub fn from_settings() -> Result<Runtime> {
70        let config = config::RuntimeConfig::from_settings()?;
71        let runtime = Arc::new(config.create_runtime()?);
72        let primary = RuntimeType::Shared(runtime.clone());
73        let secondary = RuntimeType::External(runtime.handle().clone());
74        Runtime::new(primary, Some(secondary))
75    }
76
77    pub fn single_threaded() -> Result<Runtime> {
79        let config = config::RuntimeConfig::single_threaded();
80        let owned = RuntimeType::Shared(Arc::new(config.create_runtime()?));
81        Runtime::new(owned, None)
82    }
83
84    pub fn id(&self) -> &str {
86        &self.id
87    }
88
89    pub fn primary(&self) -> tokio::runtime::Handle {
91        self.primary.handle()
92    }
93
94    pub fn secondary(&self) -> tokio::runtime::Handle {
96        self.secondary.handle()
97    }
98
99    pub fn primary_token(&self) -> CancellationToken {
101        self.cancellation_token.clone()
102    }
103
104    pub fn child_token(&self) -> CancellationToken {
106        self.endpoint_shutdown_token.child_token()
107    }
108
109    pub(crate) fn graceful_shutdown_tracker(&self) -> Arc<GracefulShutdownTracker> {
111        self.graceful_shutdown_tracker.clone()
112    }
113
114    pub fn shutdown(&self) {
116        tracing::info!("Runtime shutdown initiated");
117
118        let tracker = self.graceful_shutdown_tracker.clone();
120        let main_token = self.cancellation_token.clone();
121        let endpoint_token = self.endpoint_shutdown_token.clone();
122
123        let handle = self.primary();
125        handle.spawn(async move {
126            tracing::info!("Phase 1: Cancelling endpoint shutdown token");
128            endpoint_token.cancel();
129
130            tracing::info!("Phase 2: Waiting for graceful endpoints to complete");
132
133            let count = tracker.get_count();
134            tracing::info!("Active graceful endpoints: {}", count);
135
136            if count != 0 {
137                tracker.wait_for_completion().await;
138            }
139
140            tracing::info!(
142                "Phase 3: All graceful endpoints completed, shutting down NATS/ETCD connections"
143            );
144            main_token.cancel();
145        });
146    }
147}
148
149impl RuntimeType {
150    pub fn handle(&self) -> tokio::runtime::Handle {
152        match self {
153            RuntimeType::External(rt) => rt.clone(),
154            RuntimeType::Shared(rt) => rt.handle().clone(),
155        }
156    }
157}
158
159impl std::fmt::Debug for RuntimeType {
160    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
161        match self {
162            RuntimeType::External(_) => write!(f, "RuntimeType::External"),
163            RuntimeType::Shared(_) => write!(f, "RuntimeType::Shared"),
164        }
165    }
166}