1use std::sync::atomic::AtomicU8;
23use std::sync::atomic::Ordering;
24use std::sync::Arc;
25use std::time::Duration;
26
27use cheetah_string::CheetahString;
28use rocketmq_common::common::namesrv::namesrv_config::NamesrvConfig;
29use rocketmq_common::common::server::config::ServerConfig;
30use rocketmq_common::utils::network_util::NetworkUtil;
31use rocketmq_error::RocketMQError;
32use rocketmq_error::RocketMQResult;
33use rocketmq_remoting::base::channel_event_listener::ChannelEventListener;
34use rocketmq_remoting::clients::rocketmq_tokio_client::RocketmqDefaultClient;
35use rocketmq_remoting::clients::RemotingClient;
36use rocketmq_remoting::code::request_code::RequestCode;
37use rocketmq_remoting::remoting::RemotingService;
38use rocketmq_remoting::remoting_server::rocketmq_tokio_server::RocketMQServer;
39use rocketmq_remoting::request_processor::default_request_processor::DefaultRemotingRequestProcessor;
40use rocketmq_remoting::runtime::config::client_config::TokioClientConfig;
41use rocketmq_rust::schedule::simple_scheduler::ScheduledTaskManager;
42use rocketmq_rust::wait_for_signal;
43use rocketmq_rust::ArcMut;
44use tokio::sync::broadcast;
45use tracing::debug;
46use tracing::error;
47use tracing::info;
48use tracing::instrument;
49use tracing::warn;
50
51use crate::processor::ClientRequestProcessor;
52use crate::processor::NameServerRequestProcessor;
53use crate::processor::NameServerRequestProcessorWrapper;
54use crate::route::route_info_manager::RouteInfoManager;
55use crate::route::route_info_manager_v2::RouteInfoManagerV2;
56use crate::route::route_info_manager_wrapper::RouteInfoManagerWrapper;
57use crate::route::zone_route_rpc_hook::ZoneRouteRPCHook;
58use crate::route_info::broker_housekeeping_service::BrokerHousekeepingService;
59use crate::KVConfigManager;
60
61#[repr(u8)]
71#[derive(Debug, Clone, Copy, PartialEq, Eq)]
72enum RuntimeState {
73 Created = 0,
75 Initialized = 1,
77 Running = 2,
79 ShuttingDown = 3,
81 Stopped = 4,
83}
84
85impl RuntimeState {
86 #[inline]
88 fn from_u8(value: u8) -> Option<Self> {
89 match value {
90 0 => Some(Self::Created),
91 1 => Some(Self::Initialized),
92 2 => Some(Self::Running),
93 3 => Some(Self::ShuttingDown),
94 4 => Some(Self::Stopped),
95 _ => None,
96 }
97 }
98
99 #[inline]
101 fn name(&self) -> &'static str {
102 match self {
103 Self::Created => "Created",
104 Self::Initialized => "Initialized",
105 Self::Running => "Running",
106 Self::ShuttingDown => "ShuttingDown",
107 Self::Stopped => "Stopped",
108 }
109 }
110
111 #[inline]
113 fn can_transition_to(&self, next: RuntimeState) -> bool {
114 match (self, next) {
115 (Self::Created, Self::Initialized) => true,
117 (Self::Created, Self::Stopped) => true,
118
119 (Self::Initialized, Self::Running) => true,
121 (Self::Initialized, Self::Stopped) => true,
122
123 (Self::Running, Self::ShuttingDown) => true,
125
126 (Self::ShuttingDown, Self::Stopped) => true,
128
129 (Self::Stopped, _) => false,
131
132 _ => false,
134 }
135 }
136}
137
138impl std::fmt::Display for RuntimeState {
139 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
140 write!(f, "{}", self.name())
141 }
142}
143
144pub struct NameServerBootstrap {
145 name_server_runtime: NameServerRuntime,
146}
147
148pub struct Builder {
150 name_server_config: Option<NamesrvConfig>,
151 server_config: Option<ServerConfig>,
152}
153
154struct NameServerRuntime {
158 inner: ArcMut<NameServerRuntimeInner>,
159 scheduled_task_manager: ScheduledTaskManager,
160 shutdown_rx: Option<broadcast::Receiver<()>>,
161 server_inner: Option<RocketMQServer<NameServerRequestProcessor>>,
162 server_task_handle: Option<tokio::task::JoinHandle<()>>,
164 state: Arc<AtomicU8>,
166}
167
168impl NameServerBootstrap {
169 #[instrument(skip(self), name = "nameserver_boot")]
176 pub async fn boot(mut self) -> RocketMQResult<()> {
177 info!("Booting RocketMQ NameServer (Rust)...");
178
179 let (shutdown_tx, shutdown_rx) = broadcast::channel(1);
180 self.name_server_runtime.shutdown_rx = Some(shutdown_rx);
181 self.name_server_runtime.initialize().await?;
182
183 tokio::join!(
184 self.name_server_runtime.start(),
185 wait_for_signal_inner(shutdown_tx)
186 );
187
188 info!("NameServer shutdown completed");
189 Ok(())
190 }
191}
192
193#[inline]
195async fn wait_for_signal_inner(shutdown_tx: broadcast::Sender<()>) {
196 tokio::select! {
197 _ = wait_for_signal() => {
198 info!("Shutdown signal received, broadcasting to all components...");
199 }
200 }
201 if let Err(e) = shutdown_tx.send(()) {
203 error!("Failed to broadcast shutdown signal: {}", e);
204 }
205}
206
207impl NameServerRuntime {
208 #[inline]
210 fn current_state(&self) -> RuntimeState {
211 let value = self.state.load(Ordering::Acquire);
212 RuntimeState::from_u8(value).unwrap_or_else(|| {
213 error!("Invalid runtime state value: {}", value);
214 RuntimeState::Stopped
215 })
216 }
217
218 #[inline]
222 fn transition_to(&self, next: RuntimeState) -> RocketMQResult<()> {
223 let current = self.current_state();
224
225 if !current.can_transition_to(next) {
226 let error_msg = format!(
227 "Invalid state transition: {} -> {}. Current state does not allow this transition.",
228 current.name(),
229 next.name()
230 );
231 error!("{}", error_msg);
232 return Err(RocketMQError::Internal(error_msg));
233 }
234
235 let old_value = self.state.swap(next as u8, Ordering::AcqRel);
237 let old_state = RuntimeState::from_u8(old_value).unwrap_or(RuntimeState::Stopped);
238
239 info!("State transition: {} -> {}", old_state.name(), next.name());
240
241 Ok(())
242 }
243
244 #[inline]
246 fn validate_state(&self, expected: &[RuntimeState], operation: &str) -> RocketMQResult<()> {
247 let current = self.current_state();
248
249 if !expected.contains(¤t) {
250 let expected_names: Vec<_> = expected.iter().map(|s| s.name()).collect();
251 let error_msg = format!(
252 "Operation '{}' requires state to be one of [{}], but current state is {}",
253 operation,
254 expected_names.join(", "),
255 current.name()
256 );
257 error!("{}", error_msg);
258 return Err(RocketMQError::Internal(error_msg));
259 }
260
261 Ok(())
262 }
263
264 #[instrument(skip(self), name = "runtime_initialize")]
272 pub async fn initialize(&mut self) -> RocketMQResult<()> {
273 self.validate_state(&[RuntimeState::Created], "initialize")?;
275
276 info!("Phase 1/4: Loading configuration...");
277 if let Err(e) = self.load_config().await {
278 error!("Initialization failed during config load: {}", e);
279 let _ = self.transition_to(RuntimeState::Stopped);
280 return Err(e);
281 }
282
283 info!("Phase 2/4: Initializing network server...");
284 self.initialize_network_components();
285
286 info!("Phase 3/4: Registering RPC hooks...");
287 self.initialize_rpc_hooks();
288
289 info!("Phase 4/4: Starting scheduled tasks...");
290 self.start_schedule_service();
291
292 self.transition_to(RuntimeState::Initialized)?;
294
295 info!("Initialization completed successfully");
296 Ok(())
297 }
298
299 async fn load_config(&mut self) -> RocketMQResult<()> {
300 self.inner.kvconfig_manager_mut().load().map_err(|e| {
302 error!("KV config load failed: {}", e);
303 RocketMQError::storage_read_failed(
304 "kv_config",
305 format!("Configuration load error: {}", e),
306 )
307 })?;
308 debug!("KV configuration loaded successfully");
309 Ok(())
310 }
311
312 fn initialize_network_components(&mut self) {
314 let server = RocketMQServer::new(Arc::new(self.inner.server_config().clone()));
315 self.server_inner = Some(server);
316 debug!(
317 "Network server initialized on port {}",
318 self.inner.server_config().listen_port
319 );
320 }
321
322 fn start_schedule_service(&self) {
326 let scan_not_active_broker_interval = self
327 .inner
328 .name_server_config()
329 .scan_not_active_broker_interval;
330 let mut name_server_runtime_inner = self.inner.clone();
331
332 self.scheduled_task_manager.add_fixed_rate_task_async(
333 Duration::from_secs(5), Duration::from_millis(scan_not_active_broker_interval),
335 async move |_ctx| {
336 debug!("Running scheduled broker health check");
337 if let Some(route_info_manager) =
338 name_server_runtime_inner.route_info_manager.as_mut()
339 {
340 route_info_manager.scan_not_active_broker();
341 }
342 Ok(())
343 },
344 );
345
346 info!(
347 "Scheduled task started: broker health check (interval: {}ms)",
348 scan_not_active_broker_interval
349 );
350 }
351
352 fn initialize_rpc_hooks(&mut self) {
354 if let Some(server) = self.server_inner.as_mut() {
355 server.register_rpc_hook(Arc::new(ZoneRouteRPCHook));
356 debug!("RPC hooks registered: ZoneRouteRPCHook");
357 }
358 }
359
360 #[instrument(skip(self), name = "runtime_start")]
369 pub async fn start(&mut self) {
370 if let Err(e) = self.validate_state(&[RuntimeState::Initialized], "start") {
372 error!("Cannot start: {}", e);
373 return;
374 }
375
376 info!("Starting NameServer main loop...");
377
378 let request_processor = self.init_processors();
379
380 let mut server = self
382 .server_inner
383 .take()
384 .expect("Server not initialized - call initialize() first");
385
386 self.inner.route_info_manager().start();
388
389 let channel_event_listener =
391 Some(self.inner.broker_housekeeping_service().clone() as Arc<dyn ChannelEventListener>);
392
393 let server_handle = tokio::spawn(async move {
395 debug!("Server task started");
396 server.run(request_processor, channel_event_listener).await;
397 debug!("Server task completed");
398 });
399 self.server_task_handle = Some(server_handle);
400
401 let local_address = NetworkUtil::get_local_address().unwrap_or_else(|| {
403 warn!("Failed to determine local address, using 127.0.0.1");
404 "127.0.0.1".to_string()
405 });
406
407 let namesrv = CheetahString::from_string(format!(
408 "{}:{}",
409 local_address,
410 self.inner.server_config().listen_port
411 ));
412
413 debug!("NameServer address: {}", namesrv);
414
415 let weak_arc_mut = ArcMut::downgrade(&self.inner.remoting_client);
416 self.inner
417 .remoting_client
418 .update_name_server_address_list(vec![namesrv])
419 .await;
420
421 self.inner.remoting_client.start(weak_arc_mut).await;
423
424 if let Err(e) = self.transition_to(RuntimeState::Running) {
426 error!("Failed to transition to Running state: {}", e);
427 return;
428 }
429
430 info!("NameServer is now running and accepting requests");
431
432 tokio::select! {
434 result = self.shutdown_rx.as_mut()
435 .expect("Shutdown channel not initialized")
436 .recv() => {
437 match result {
438 Ok(_) => info!("Shutdown signal received, initiating graceful shutdown..."),
439 Err(e) => error!("Error receiving shutdown signal: {}", e),
440 }
441 self.shutdown().await;
442 }
443 }
444 }
445
446 #[instrument(skip(self), name = "runtime_shutdown")]
455 async fn shutdown(&mut self) {
456 if let Err(e) = self.validate_state(&[RuntimeState::Running], "shutdown") {
458 warn!("Shutdown called in unexpected state: {}", e);
459 }
461
462 if let Err(e) = self.transition_to(RuntimeState::ShuttingDown) {
463 error!("Failed to transition to ShuttingDown state: {}", e);
464 }
465
466 const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(30);
467 const TASK_JOIN_TIMEOUT: Duration = Duration::from_secs(10);
468
469 info!(
470 "Phase 1/4: Waiting for in-flight requests (timeout: {}s)...",
471 SHUTDOWN_TIMEOUT.as_secs()
472 );
473 if let Err(e) = self.wait_for_inflight_requests(SHUTDOWN_TIMEOUT).await {
474 warn!("In-flight request wait timeout or error: {}", e);
475 }
476
477 info!("Phase 2/4: Stopping scheduled tasks...");
478 self.scheduled_task_manager.cancel_all();
479
480 info!("Phase 3/4: Shutting down route info manager...");
481 self.inner
482 .route_info_manager_mut()
483 .shutdown_unregister_service();
484
485 info!(
486 "Phase 4/4: Waiting for server task (timeout: {}s)...",
487 TASK_JOIN_TIMEOUT.as_secs()
488 );
489 if let Err(e) = self.wait_for_server_task(TASK_JOIN_TIMEOUT).await {
490 warn!("Task join timeout or error: {}", e);
491 }
492
493 if let Err(e) = self.transition_to(RuntimeState::Stopped) {
495 error!("Failed to transition to Stopped state: {}", e);
496 }
497
498 info!("Graceful shutdown completed");
499 }
500
501 #[instrument(skip(self), name = "wait_inflight_requests")]
506 async fn wait_for_inflight_requests(&self, timeout: Duration) -> RocketMQResult<()> {
507 tokio::time::timeout(timeout, async {
508 tokio::time::sleep(Duration::from_millis(500)).await;
514 debug!("In-flight request grace period completed");
515 })
516 .await
517 .map_err(|_| RocketMQError::Timeout {
518 operation: "wait_for_inflight_requests",
519 timeout_ms: timeout.as_millis() as u64,
520 })
521 }
522
523 #[instrument(skip(self), name = "wait_server_task")]
528 async fn wait_for_server_task(&mut self, timeout: Duration) -> RocketMQResult<()> {
529 if let Some(handle) = self.server_task_handle.take() {
530 match tokio::time::timeout(timeout, handle).await {
531 Ok(Ok(())) => {
532 debug!("Server task completed successfully");
533 Ok(())
534 }
535 Ok(Err(e)) => {
536 error!("Server task panicked: {}", e);
537 Err(RocketMQError::Internal(format!(
538 "Server task panicked: {}",
539 e
540 )))
541 }
542 Err(_) => {
543 warn!(
544 "Server task join timeout ({}s), task may still be running",
545 timeout.as_secs()
546 );
547 Err(RocketMQError::Timeout {
548 operation: "server_task_join",
549 timeout_ms: timeout.as_millis() as u64,
550 })
551 }
552 }
553 } else {
554 debug!("No server task handle to wait for");
555 Ok(())
556 }
557 }
558
559 #[inline]
565 fn init_processors(&self) -> NameServerRequestProcessor {
566 let client_request_processor = ClientRequestProcessor::new(self.inner.clone());
567 let default_request_processor =
568 crate::processor::default_request_processor::DefaultRequestProcessor::new(
569 self.inner.clone(),
570 );
571
572 let mut name_server_request_processor = NameServerRequestProcessor::new();
573
574 name_server_request_processor.register_processor(
576 RequestCode::GetRouteinfoByTopic,
577 NameServerRequestProcessorWrapper::ClientRequestProcessor(ArcMut::new(
578 client_request_processor,
579 )),
580 );
581
582 name_server_request_processor.register_default_processor(
584 NameServerRequestProcessorWrapper::DefaultRequestProcessor(ArcMut::new(
585 default_request_processor,
586 )),
587 );
588
589 debug!("Request processor pipeline configured");
590 name_server_request_processor
591 }
592}
593
594impl Drop for NameServerRuntime {
595 #[inline]
596 fn drop(&mut self) {
597 let current_state = self.current_state();
598 debug!("NameServerRuntime dropped in state: {}", current_state);
599
600 if current_state != RuntimeState::Stopped {
602 warn!(
603 "NameServerRuntime dropped without proper shutdown (current state: {}). This may \
604 indicate a panic or abnormal termination.",
605 current_state
606 );
607 }
608 }
609}
610
611impl Default for Builder {
612 #[inline]
613 fn default() -> Self {
614 Self::new()
615 }
616}
617
618impl Builder {
619 #[inline]
620 pub fn new() -> Self {
621 Builder {
622 name_server_config: None,
623 server_config: None,
624 }
625 }
626
627 #[inline]
628 pub fn set_name_server_config(mut self, name_server_config: NamesrvConfig) -> Self {
629 self.name_server_config = Some(name_server_config);
630 self
631 }
632
633 #[inline]
634 pub fn set_server_config(mut self, server_config: ServerConfig) -> Self {
635 self.server_config = Some(server_config);
636 self
637 }
638
639 #[instrument(skip(self), name = "build_bootstrap")]
643 pub fn build(self) -> NameServerBootstrap {
644 let name_server_config = self.name_server_config.unwrap_or_default();
645 let tokio_client_config = TokioClientConfig::default();
646 let server_config = self.server_config.unwrap_or_default();
647
648 info!("Building NameServer with configuration:");
649 info!(" - Listen port: {}", server_config.listen_port);
650 info!(
651 " - Scan interval: {}ms",
652 name_server_config.scan_not_active_broker_interval
653 );
654 info!(
655 " - Use V2 RouteManager: {}",
656 name_server_config.use_route_info_manager_v2
657 );
658
659 let remoting_client = ArcMut::new(RocketmqDefaultClient::new(
661 Arc::new(tokio_client_config.clone()),
662 DefaultRemotingRequestProcessor,
663 ));
664
665 let mut inner = ArcMut::new(NameServerRuntimeInner {
667 name_server_config: name_server_config.clone(),
668 tokio_client_config: tokio_client_config.clone(),
669 server_config: server_config.clone(),
670 route_info_manager: None,
671 kvconfig_manager: None,
672 remoting_client,
673 broker_housekeeping_service: None,
674 });
675
676 let use_v2 = name_server_config.use_route_info_manager_v2;
678
679 let route_info_manager = if use_v2 {
681 info!("Using RouteInfoManager V2 (DashMap-based, 5-50x faster)");
682 RouteInfoManagerWrapper::V2(Box::new(RouteInfoManagerV2::new(inner.clone())))
683 } else {
684 warn!("Using RouteInfoManager V1 (legacy). Consider V2 for better performance.");
685 RouteInfoManagerWrapper::V1(Box::new(RouteInfoManager::new(inner.clone())))
686 };
687
688 let kvconfig_manager = KVConfigManager::new(inner.clone());
689 let broker_housekeeping_service = Arc::new(BrokerHousekeepingService::new(inner.clone()));
690
691 inner.route_info_manager = Some(route_info_manager);
693 inner.kvconfig_manager = Some(kvconfig_manager);
694 inner.broker_housekeeping_service = Some(broker_housekeeping_service);
695
696 info!("NameServer bootstrap built successfully");
697
698 NameServerBootstrap {
699 name_server_runtime: NameServerRuntime {
700 inner,
701 scheduled_task_manager: ScheduledTaskManager::new(),
702 shutdown_rx: None,
703 server_inner: None,
704 server_task_handle: None,
705 state: Arc::new(AtomicU8::new(RuntimeState::Created as u8)),
706 },
707 }
708 }
709}
710
711pub(crate) struct NameServerRuntimeInner {
716 name_server_config: NamesrvConfig,
718 tokio_client_config: TokioClientConfig,
719 server_config: ServerConfig,
720
721 route_info_manager: Option<RouteInfoManagerWrapper>,
723 kvconfig_manager: Option<KVConfigManager>,
724 remoting_client: ArcMut<RocketmqDefaultClient>,
725 broker_housekeeping_service: Option<Arc<BrokerHousekeepingService>>,
726}
727
728impl NameServerRuntimeInner {
729 #[inline]
732 pub fn name_server_config(&self) -> &NamesrvConfig {
733 &self.name_server_config
734 }
735
736 #[inline]
737 pub fn name_server_config_mut(&mut self) -> &mut NamesrvConfig {
738 &mut self.name_server_config
739 }
740
741 #[inline]
742 pub fn tokio_client_config(&self) -> &TokioClientConfig {
743 &self.tokio_client_config
744 }
745
746 #[inline]
747 pub fn server_config(&self) -> &ServerConfig {
748 &self.server_config
749 }
750
751 #[inline]
754 pub fn route_info_manager(&self) -> &RouteInfoManagerWrapper {
755 self.route_info_manager
756 .as_ref()
757 .expect("RouteInfoManager not initialized")
758 }
759
760 #[inline]
761 pub fn route_info_manager_mut(&mut self) -> &mut RouteInfoManagerWrapper {
762 self.route_info_manager
763 .as_mut()
764 .expect("RouteInfoManager not initialized")
765 }
766
767 #[inline]
768 pub fn kvconfig_manager(&self) -> &KVConfigManager {
769 self.kvconfig_manager
770 .as_ref()
771 .expect("KVConfigManager not initialized")
772 }
773
774 #[inline]
775 pub fn kvconfig_manager_mut(&mut self) -> &mut KVConfigManager {
776 self.kvconfig_manager
777 .as_mut()
778 .expect("KVConfigManager not initialized")
779 }
780
781 #[inline]
782 pub fn remoting_client(&self) -> &RocketmqDefaultClient {
783 &self.remoting_client
784 }
785
786 #[inline]
787 pub fn remoting_client_mut(&mut self) -> &mut RocketmqDefaultClient {
788 &mut self.remoting_client
789 }
790
791 #[inline]
792 pub fn broker_housekeeping_service(&self) -> &Arc<BrokerHousekeepingService> {
793 self.broker_housekeeping_service
794 .as_ref()
795 .expect("BrokerHousekeepingService not initialized")
796 }
797}