rocketmq_namesrv/
bootstrap.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18//! NameServer Bootstrap Module
19//!
20//! Provides the core runtime infrastructure for RocketMQ NameServer.
21
22use std::sync::atomic::AtomicU8;
23use std::sync::atomic::Ordering;
24use std::sync::Arc;
25use std::time::Duration;
26
27use cheetah_string::CheetahString;
28use rocketmq_common::common::namesrv::namesrv_config::NamesrvConfig;
29use rocketmq_common::common::server::config::ServerConfig;
30use rocketmq_common::utils::network_util::NetworkUtil;
31use rocketmq_error::RocketMQError;
32use rocketmq_error::RocketMQResult;
33use rocketmq_remoting::base::channel_event_listener::ChannelEventListener;
34use rocketmq_remoting::clients::rocketmq_tokio_client::RocketmqDefaultClient;
35use rocketmq_remoting::clients::RemotingClient;
36use rocketmq_remoting::code::request_code::RequestCode;
37use rocketmq_remoting::remoting::RemotingService;
38use rocketmq_remoting::remoting_server::rocketmq_tokio_server::RocketMQServer;
39use rocketmq_remoting::request_processor::default_request_processor::DefaultRemotingRequestProcessor;
40use rocketmq_remoting::runtime::config::client_config::TokioClientConfig;
41use rocketmq_rust::schedule::simple_scheduler::ScheduledTaskManager;
42use rocketmq_rust::wait_for_signal;
43use rocketmq_rust::ArcMut;
44use tokio::sync::broadcast;
45use tracing::debug;
46use tracing::error;
47use tracing::info;
48use tracing::instrument;
49use tracing::warn;
50
51use crate::processor::ClientRequestProcessor;
52use crate::processor::NameServerRequestProcessor;
53use crate::processor::NameServerRequestProcessorWrapper;
54use crate::route::route_info_manager::RouteInfoManager;
55use crate::route::route_info_manager_v2::RouteInfoManagerV2;
56use crate::route::route_info_manager_wrapper::RouteInfoManagerWrapper;
57use crate::route::zone_route_rpc_hook::ZoneRouteRPCHook;
58use crate::route_info::broker_housekeeping_service::BrokerHousekeepingService;
59use crate::KVConfigManager;
60
61/// Runtime lifecycle states for NameServer
62///
63/// State transitions:
64/// ```text
65/// Created -> Initialized -> Running -> ShuttingDown -> Stopped
66///   |                                       ^
67///   +---------------------------------------+
68///              (on error during init/start)
69/// ```
70#[repr(u8)]
71#[derive(Debug, Clone, Copy, PartialEq, Eq)]
72enum RuntimeState {
73    /// Initial state after construction
74    Created = 0,
75    /// Configuration loaded, components initialized
76    Initialized = 1,
77    /// Server running and accepting requests
78    Running = 2,
79    /// Graceful shutdown in progress
80    ShuttingDown = 3,
81    /// Fully stopped, resources released
82    Stopped = 4,
83}
84
85impl RuntimeState {
86    /// Convert u8 to RuntimeState
87    #[inline]
88    fn from_u8(value: u8) -> Option<Self> {
89        match value {
90            0 => Some(Self::Created),
91            1 => Some(Self::Initialized),
92            2 => Some(Self::Running),
93            3 => Some(Self::ShuttingDown),
94            4 => Some(Self::Stopped),
95            _ => None,
96        }
97    }
98
99    /// Get human-readable state name
100    #[inline]
101    fn name(&self) -> &'static str {
102        match self {
103            Self::Created => "Created",
104            Self::Initialized => "Initialized",
105            Self::Running => "Running",
106            Self::ShuttingDown => "ShuttingDown",
107            Self::Stopped => "Stopped",
108        }
109    }
110
111    /// Check if state transition is valid
112    #[inline]
113    fn can_transition_to(&self, next: RuntimeState) -> bool {
114        match (self, next) {
115            // Created can only go to Initialized or Stopped (on error)
116            (Self::Created, Self::Initialized) => true,
117            (Self::Created, Self::Stopped) => true,
118
119            // Initialized can go to Running or Stopped (on error)
120            (Self::Initialized, Self::Running) => true,
121            (Self::Initialized, Self::Stopped) => true,
122
123            // Running can only go to ShuttingDown
124            (Self::Running, Self::ShuttingDown) => true,
125
126            // ShuttingDown can only go to Stopped
127            (Self::ShuttingDown, Self::Stopped) => true,
128
129            // Stopped is terminal
130            (Self::Stopped, _) => false,
131
132            // All other transitions are invalid
133            _ => false,
134        }
135    }
136}
137
138impl std::fmt::Display for RuntimeState {
139    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
140        write!(f, "{}", self.name())
141    }
142}
143
144pub struct NameServerBootstrap {
145    name_server_runtime: NameServerRuntime,
146}
147
148/// Builder for creating NameServerBootstrap with custom configuration
149pub struct Builder {
150    name_server_config: Option<NamesrvConfig>,
151    server_config: Option<ServerConfig>,
152}
153
154/// Core runtime managing NameServer lifecycle and operations
155///
156/// Coordinates initialization, startup, and graceful shutdown of all components.
157struct NameServerRuntime {
158    inner: ArcMut<NameServerRuntimeInner>,
159    scheduled_task_manager: ScheduledTaskManager,
160    shutdown_rx: Option<broadcast::Receiver<()>>,
161    server_inner: Option<RocketMQServer<NameServerRequestProcessor>>,
162    /// Server task handle for graceful shutdown
163    server_task_handle: Option<tokio::task::JoinHandle<()>>,
164    /// Runtime state machine for lifecycle management
165    state: Arc<AtomicU8>,
166}
167
168impl NameServerBootstrap {
169    /// Boot the NameServer and run until shutdown signal
170    ///
171    /// This is the main entry point that orchestrates:
172    /// 1. Component initialization
173    /// 2. Server startup
174    /// 3. Graceful shutdown on signal
175    #[instrument(skip(self), name = "nameserver_boot")]
176    pub async fn boot(mut self) -> RocketMQResult<()> {
177        info!("Booting RocketMQ NameServer (Rust)...");
178
179        let (shutdown_tx, shutdown_rx) = broadcast::channel(1);
180        self.name_server_runtime.shutdown_rx = Some(shutdown_rx);
181        self.name_server_runtime.initialize().await?;
182
183        tokio::join!(
184            self.name_server_runtime.start(),
185            wait_for_signal_inner(shutdown_tx)
186        );
187
188        info!("NameServer shutdown completed");
189        Ok(())
190    }
191}
192
193/// Wait for system shutdown signal (SIGINT, SIGTERM)
194#[inline]
195async fn wait_for_signal_inner(shutdown_tx: broadcast::Sender<()>) {
196    tokio::select! {
197        _ = wait_for_signal() => {
198            info!("Shutdown signal received, broadcasting to all components...");
199        }
200    }
201    // Broadcast shutdown to all listeners
202    if let Err(e) = shutdown_tx.send(()) {
203        error!("Failed to broadcast shutdown signal: {}", e);
204    }
205}
206
207impl NameServerRuntime {
208    /// Get current runtime state
209    #[inline]
210    fn current_state(&self) -> RuntimeState {
211        let value = self.state.load(Ordering::Acquire);
212        RuntimeState::from_u8(value).unwrap_or_else(|| {
213            error!("Invalid runtime state value: {}", value);
214            RuntimeState::Stopped
215        })
216    }
217
218    /// Attempt to transition to a new state
219    ///
220    /// Returns `Ok(())` if transition succeeds, `Err` if transition is invalid.
221    #[inline]
222    fn transition_to(&self, next: RuntimeState) -> RocketMQResult<()> {
223        let current = self.current_state();
224
225        if !current.can_transition_to(next) {
226            let error_msg = format!(
227                "Invalid state transition: {} -> {}. Current state does not allow this transition.",
228                current.name(),
229                next.name()
230            );
231            error!("{}", error_msg);
232            return Err(RocketMQError::Internal(error_msg));
233        }
234
235        // Perform atomic state transition
236        let old_value = self.state.swap(next as u8, Ordering::AcqRel);
237        let old_state = RuntimeState::from_u8(old_value).unwrap_or(RuntimeState::Stopped);
238
239        info!("State transition: {} -> {}", old_state.name(), next.name());
240
241        Ok(())
242    }
243
244    /// Validate that current state is one of the expected states
245    #[inline]
246    fn validate_state(&self, expected: &[RuntimeState], operation: &str) -> RocketMQResult<()> {
247        let current = self.current_state();
248
249        if !expected.contains(&current) {
250            let expected_names: Vec<_> = expected.iter().map(|s| s.name()).collect();
251            let error_msg = format!(
252                "Operation '{}' requires state to be one of [{}], but current state is {}",
253                operation,
254                expected_names.join(", "),
255                current.name()
256            );
257            error!("{}", error_msg);
258            return Err(RocketMQError::Internal(error_msg));
259        }
260
261        Ok(())
262    }
263
264    /// Initialize all components in proper order
265    ///
266    /// Initialization sequence:
267    /// 1. Load KV configuration from disk
268    /// 2. Initialize network server
269    /// 3. Setup RPC hooks
270    /// 4. Start scheduled health monitoring tasks
271    #[instrument(skip(self), name = "runtime_initialize")]
272    pub async fn initialize(&mut self) -> RocketMQResult<()> {
273        // Validate we're in Created state
274        self.validate_state(&[RuntimeState::Created], "initialize")?;
275
276        info!("Phase 1/4: Loading configuration...");
277        if let Err(e) = self.load_config().await {
278            error!("Initialization failed during config load: {}", e);
279            let _ = self.transition_to(RuntimeState::Stopped);
280            return Err(e);
281        }
282
283        info!("Phase 2/4: Initializing network server...");
284        self.initialize_network_components();
285
286        info!("Phase 3/4: Registering RPC hooks...");
287        self.initialize_rpc_hooks();
288
289        info!("Phase 4/4: Starting scheduled tasks...");
290        self.start_schedule_service();
291
292        // Transition to Initialized state
293        self.transition_to(RuntimeState::Initialized)?;
294
295        info!("Initialization completed successfully");
296        Ok(())
297    }
298
299    async fn load_config(&mut self) -> RocketMQResult<()> {
300        // KVConfigManager is now always initialized
301        self.inner.kvconfig_manager_mut().load().map_err(|e| {
302            error!("KV config load failed: {}", e);
303            RocketMQError::storage_read_failed(
304                "kv_config",
305                format!("Configuration load error: {}", e),
306            )
307        })?;
308        debug!("KV configuration loaded successfully");
309        Ok(())
310    }
311
312    /// Initialize network server for handling client requests
313    fn initialize_network_components(&mut self) {
314        let server = RocketMQServer::new(Arc::new(self.inner.server_config().clone()));
315        self.server_inner = Some(server);
316        debug!(
317            "Network server initialized on port {}",
318            self.inner.server_config().listen_port
319        );
320    }
321
322    /// Start scheduled tasks for system health monitoring
323    ///
324    /// Schedules periodic broker health checks to detect and remove inactive brokers
325    fn start_schedule_service(&self) {
326        let scan_not_active_broker_interval = self
327            .inner
328            .name_server_config()
329            .scan_not_active_broker_interval;
330        let mut name_server_runtime_inner = self.inner.clone();
331
332        self.scheduled_task_manager.add_fixed_rate_task_async(
333            Duration::from_secs(5), // Initial delay
334            Duration::from_millis(scan_not_active_broker_interval),
335            async move |_ctx| {
336                debug!("Running scheduled broker health check");
337                if let Some(route_info_manager) =
338                    name_server_runtime_inner.route_info_manager.as_mut()
339                {
340                    route_info_manager.scan_not_active_broker();
341                }
342                Ok(())
343            },
344        );
345
346        info!(
347            "Scheduled task started: broker health check (interval: {}ms)",
348            scan_not_active_broker_interval
349        );
350    }
351
352    /// Initialize RPC hooks for request pre/post-processing
353    fn initialize_rpc_hooks(&mut self) {
354        if let Some(server) = self.server_inner.as_mut() {
355            server.register_rpc_hook(Arc::new(ZoneRouteRPCHook));
356            debug!("RPC hooks registered: ZoneRouteRPCHook");
357        }
358    }
359
360    /// Start the server and enter main event loop
361    ///
362    /// This method:
363    /// 1. Initializes request processors
364    /// 2. Starts the network server in async task
365    /// 3. Starts the remoting client
366    /// 4. Waits for shutdown signal
367    /// 5. Performs graceful shutdown
368    #[instrument(skip(self), name = "runtime_start")]
369    pub async fn start(&mut self) {
370        // Validate we're in Initialized state
371        if let Err(e) = self.validate_state(&[RuntimeState::Initialized], "start") {
372            error!("Cannot start: {}", e);
373            return;
374        }
375
376        info!("Starting NameServer main loop...");
377
378        let request_processor = self.init_processors();
379
380        // Take server instance for async execution
381        let mut server = self
382            .server_inner
383            .take()
384            .expect("Server not initialized - call initialize() first");
385
386        // Start route info manager service
387        self.inner.route_info_manager().start();
388
389        // Get broker housekeeping service for server
390        let channel_event_listener =
391            Some(self.inner.broker_housekeeping_service().clone() as Arc<dyn ChannelEventListener>);
392
393        // Spawn server task and retain handle for graceful shutdown
394        let server_handle = tokio::spawn(async move {
395            debug!("Server task started");
396            server.run(request_processor, channel_event_listener).await;
397            debug!("Server task completed");
398        });
399        self.server_task_handle = Some(server_handle);
400
401        // Setup remoting client with name server address
402        let local_address = NetworkUtil::get_local_address().unwrap_or_else(|| {
403            warn!("Failed to determine local address, using 127.0.0.1");
404            "127.0.0.1".to_string()
405        });
406
407        let namesrv = CheetahString::from_string(format!(
408            "{}:{}",
409            local_address,
410            self.inner.server_config().listen_port
411        ));
412
413        debug!("NameServer address: {}", namesrv);
414
415        let weak_arc_mut = ArcMut::downgrade(&self.inner.remoting_client);
416        self.inner
417            .remoting_client
418            .update_name_server_address_list(vec![namesrv])
419            .await;
420
421        // Start remoting client directly (no spawn needed as it's managed by self.inner)
422        self.inner.remoting_client.start(weak_arc_mut).await;
423
424        // Transition to Running state
425        if let Err(e) = self.transition_to(RuntimeState::Running) {
426            error!("Failed to transition to Running state: {}", e);
427            return;
428        }
429
430        info!("NameServer is now running and accepting requests");
431
432        // Wait for shutdown signal
433        tokio::select! {
434            result = self.shutdown_rx.as_mut()
435                .expect("Shutdown channel not initialized")
436                .recv() => {
437                match result {
438                    Ok(_) => info!("Shutdown signal received, initiating graceful shutdown..."),
439                    Err(e) => error!("Error receiving shutdown signal: {}", e),
440                }
441                self.shutdown().await;
442            }
443        }
444    }
445
446    /// Perform graceful shutdown of all components
447    ///
448    /// Shutdown sequence:
449    /// 1. Wait for in-flight requests to complete (with timeout)
450    /// 2. Cancel all scheduled tasks
451    /// 3. Shutdown route info manager (broker unregistration)
452    /// 4. Wait for server task to complete (with timeout)
453    /// 5. Release all resources
454    #[instrument(skip(self), name = "runtime_shutdown")]
455    async fn shutdown(&mut self) {
456        // Validate we're in Running state and transition to ShuttingDown
457        if let Err(e) = self.validate_state(&[RuntimeState::Running], "shutdown") {
458            warn!("Shutdown called in unexpected state: {}", e);
459            // Allow shutdown even in unexpected state for safety
460        }
461
462        if let Err(e) = self.transition_to(RuntimeState::ShuttingDown) {
463            error!("Failed to transition to ShuttingDown state: {}", e);
464        }
465
466        const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(30);
467        const TASK_JOIN_TIMEOUT: Duration = Duration::from_secs(10);
468
469        info!(
470            "Phase 1/4: Waiting for in-flight requests (timeout: {}s)...",
471            SHUTDOWN_TIMEOUT.as_secs()
472        );
473        if let Err(e) = self.wait_for_inflight_requests(SHUTDOWN_TIMEOUT).await {
474            warn!("In-flight request wait timeout or error: {}", e);
475        }
476
477        info!("Phase 2/4: Stopping scheduled tasks...");
478        self.scheduled_task_manager.cancel_all();
479
480        info!("Phase 3/4: Shutting down route info manager...");
481        self.inner
482            .route_info_manager_mut()
483            .shutdown_unregister_service();
484
485        info!(
486            "Phase 4/4: Waiting for server task (timeout: {}s)...",
487            TASK_JOIN_TIMEOUT.as_secs()
488        );
489        if let Err(e) = self.wait_for_server_task(TASK_JOIN_TIMEOUT).await {
490            warn!("Task join timeout or error: {}", e);
491        }
492
493        // Transition to Stopped state
494        if let Err(e) = self.transition_to(RuntimeState::Stopped) {
495            error!("Failed to transition to Stopped state: {}", e);
496        }
497
498        info!("Graceful shutdown completed");
499    }
500
501    /// Wait for all in-flight requests to complete
502    ///
503    /// This provides a grace period for ongoing requests to finish before shutdown.
504    /// Returns immediately if no requests are in-flight.
505    #[instrument(skip(self), name = "wait_inflight_requests")]
506    async fn wait_for_inflight_requests(&self, timeout: Duration) -> RocketMQResult<()> {
507        tokio::time::timeout(timeout, async {
508            // In a production implementation, you would:
509            // 1. Check active connection count
510            // 2. Monitor pending request queue
511            // 3. Wait for all to drain
512            // For now, we provide a short grace period
513            tokio::time::sleep(Duration::from_millis(500)).await;
514            debug!("In-flight request grace period completed");
515        })
516        .await
517        .map_err(|_| RocketMQError::Timeout {
518            operation: "wait_for_inflight_requests",
519            timeout_ms: timeout.as_millis() as u64,
520        })
521    }
522
523    /// Wait for server task to complete
524    ///
525    /// Attempts graceful join with timeout. If timeout is exceeded,
526    /// task is aborted.
527    #[instrument(skip(self), name = "wait_server_task")]
528    async fn wait_for_server_task(&mut self, timeout: Duration) -> RocketMQResult<()> {
529        if let Some(handle) = self.server_task_handle.take() {
530            match tokio::time::timeout(timeout, handle).await {
531                Ok(Ok(())) => {
532                    debug!("Server task completed successfully");
533                    Ok(())
534                }
535                Ok(Err(e)) => {
536                    error!("Server task panicked: {}", e);
537                    Err(RocketMQError::Internal(format!(
538                        "Server task panicked: {}",
539                        e
540                    )))
541                }
542                Err(_) => {
543                    warn!(
544                        "Server task join timeout ({}s), task may still be running",
545                        timeout.as_secs()
546                    );
547                    Err(RocketMQError::Timeout {
548                        operation: "server_task_join",
549                        timeout_ms: timeout.as_millis() as u64,
550                    })
551                }
552            }
553        } else {
554            debug!("No server task handle to wait for");
555            Ok(())
556        }
557    }
558
559    /// Initialize and configure request processor pipeline
560    ///
561    /// Creates specialized processors for different request types:
562    /// - ClientRequestProcessor: Handles topic route queries
563    /// - DefaultRequestProcessor: Handles all other requests
564    #[inline]
565    fn init_processors(&self) -> NameServerRequestProcessor {
566        let client_request_processor = ClientRequestProcessor::new(self.inner.clone());
567        let default_request_processor =
568            crate::processor::default_request_processor::DefaultRequestProcessor::new(
569                self.inner.clone(),
570            );
571
572        let mut name_server_request_processor = NameServerRequestProcessor::new();
573
574        // Register topic route query processor
575        name_server_request_processor.register_processor(
576            RequestCode::GetRouteinfoByTopic,
577            NameServerRequestProcessorWrapper::ClientRequestProcessor(ArcMut::new(
578                client_request_processor,
579            )),
580        );
581
582        // Register default processor for all other requests
583        name_server_request_processor.register_default_processor(
584            NameServerRequestProcessorWrapper::DefaultRequestProcessor(ArcMut::new(
585                default_request_processor,
586            )),
587        );
588
589        debug!("Request processor pipeline configured");
590        name_server_request_processor
591    }
592}
593
594impl Drop for NameServerRuntime {
595    #[inline]
596    fn drop(&mut self) {
597        let current_state = self.current_state();
598        debug!("NameServerRuntime dropped in state: {}", current_state);
599
600        // Warn if not properly shut down
601        if current_state != RuntimeState::Stopped {
602            warn!(
603                "NameServerRuntime dropped without proper shutdown (current state: {}). This may \
604                 indicate a panic or abnormal termination.",
605                current_state
606            );
607        }
608    }
609}
610
611impl Default for Builder {
612    #[inline]
613    fn default() -> Self {
614        Self::new()
615    }
616}
617
618impl Builder {
619    #[inline]
620    pub fn new() -> Self {
621        Builder {
622            name_server_config: None,
623            server_config: None,
624        }
625    }
626
627    #[inline]
628    pub fn set_name_server_config(mut self, name_server_config: NamesrvConfig) -> Self {
629        self.name_server_config = Some(name_server_config);
630        self
631    }
632
633    #[inline]
634    pub fn set_server_config(mut self, server_config: ServerConfig) -> Self {
635        self.server_config = Some(server_config);
636        self
637    }
638
639    /// Build the NameServerBootstrap with configured settings
640    ///
641    /// Creates all necessary components and initializes them immediately.
642    #[instrument(skip(self), name = "build_bootstrap")]
643    pub fn build(self) -> NameServerBootstrap {
644        let name_server_config = self.name_server_config.unwrap_or_default();
645        let tokio_client_config = TokioClientConfig::default();
646        let server_config = self.server_config.unwrap_or_default();
647
648        info!("Building NameServer with configuration:");
649        info!("  - Listen port: {}", server_config.listen_port);
650        info!(
651            "  - Scan interval: {}ms",
652            name_server_config.scan_not_active_broker_interval
653        );
654        info!(
655            "  - Use V2 RouteManager: {}",
656            name_server_config.use_route_info_manager_v2
657        );
658
659        // Create remoting client
660        let remoting_client = ArcMut::new(RocketmqDefaultClient::new(
661            Arc::new(tokio_client_config.clone()),
662            DefaultRemotingRequestProcessor,
663        ));
664
665        // Create inner with empty components first
666        let mut inner = ArcMut::new(NameServerRuntimeInner {
667            name_server_config: name_server_config.clone(),
668            tokio_client_config: tokio_client_config.clone(),
669            server_config: server_config.clone(),
670            route_info_manager: None,
671            kvconfig_manager: None,
672            remoting_client,
673            broker_housekeeping_service: None,
674        });
675
676        // Check configuration flag for RouteInfoManager version
677        let use_v2 = name_server_config.use_route_info_manager_v2;
678
679        // Now initialize components with proper references
680        let route_info_manager = if use_v2 {
681            info!("Using RouteInfoManager V2 (DashMap-based, 5-50x faster)");
682            RouteInfoManagerWrapper::V2(Box::new(RouteInfoManagerV2::new(inner.clone())))
683        } else {
684            warn!("Using RouteInfoManager V1 (legacy). Consider V2 for better performance.");
685            RouteInfoManagerWrapper::V1(Box::new(RouteInfoManager::new(inner.clone())))
686        };
687
688        let kvconfig_manager = KVConfigManager::new(inner.clone());
689        let broker_housekeeping_service = Arc::new(BrokerHousekeepingService::new(inner.clone()));
690
691        // Assign initialized components
692        inner.route_info_manager = Some(route_info_manager);
693        inner.kvconfig_manager = Some(kvconfig_manager);
694        inner.broker_housekeeping_service = Some(broker_housekeeping_service);
695
696        info!("NameServer bootstrap built successfully");
697
698        NameServerBootstrap {
699            name_server_runtime: NameServerRuntime {
700                inner,
701                scheduled_task_manager: ScheduledTaskManager::new(),
702                shutdown_rx: None,
703                server_inner: None,
704                server_task_handle: None,
705                state: Arc::new(AtomicU8::new(RuntimeState::Created as u8)),
706            },
707        }
708    }
709}
710
711/// Internal runtime state shared across components
712///
713/// **Phase 4 Optimization**: Separates immutable from mutable components.
714/// Note: Config kept mutable to support runtime updates via management commands.
715pub(crate) struct NameServerRuntimeInner {
716    // Configuration (mutable to support runtime updates)
717    name_server_config: NamesrvConfig,
718    tokio_client_config: TokioClientConfig,
719    server_config: ServerConfig,
720
721    // Mutable components (Option for delayed initialization)
722    route_info_manager: Option<RouteInfoManagerWrapper>,
723    kvconfig_manager: Option<KVConfigManager>,
724    remoting_client: ArcMut<RocketmqDefaultClient>,
725    broker_housekeeping_service: Option<Arc<BrokerHousekeepingService>>,
726}
727
728impl NameServerRuntimeInner {
729    // Configuration accessors
730
731    #[inline]
732    pub fn name_server_config(&self) -> &NamesrvConfig {
733        &self.name_server_config
734    }
735
736    #[inline]
737    pub fn name_server_config_mut(&mut self) -> &mut NamesrvConfig {
738        &mut self.name_server_config
739    }
740
741    #[inline]
742    pub fn tokio_client_config(&self) -> &TokioClientConfig {
743        &self.tokio_client_config
744    }
745
746    #[inline]
747    pub fn server_config(&self) -> &ServerConfig {
748        &self.server_config
749    }
750
751    // Component accessors (with Option handling)
752
753    #[inline]
754    pub fn route_info_manager(&self) -> &RouteInfoManagerWrapper {
755        self.route_info_manager
756            .as_ref()
757            .expect("RouteInfoManager not initialized")
758    }
759
760    #[inline]
761    pub fn route_info_manager_mut(&mut self) -> &mut RouteInfoManagerWrapper {
762        self.route_info_manager
763            .as_mut()
764            .expect("RouteInfoManager not initialized")
765    }
766
767    #[inline]
768    pub fn kvconfig_manager(&self) -> &KVConfigManager {
769        self.kvconfig_manager
770            .as_ref()
771            .expect("KVConfigManager not initialized")
772    }
773
774    #[inline]
775    pub fn kvconfig_manager_mut(&mut self) -> &mut KVConfigManager {
776        self.kvconfig_manager
777            .as_mut()
778            .expect("KVConfigManager not initialized")
779    }
780
781    #[inline]
782    pub fn remoting_client(&self) -> &RocketmqDefaultClient {
783        &self.remoting_client
784    }
785
786    #[inline]
787    pub fn remoting_client_mut(&mut self) -> &mut RocketmqDefaultClient {
788        &mut self.remoting_client
789    }
790
791    #[inline]
792    pub fn broker_housekeeping_service(&self) -> &Arc<BrokerHousekeepingService> {
793        self.broker_housekeeping_service
794            .as_ref()
795            .expect("BrokerHousekeepingService not initialized")
796    }
797}