Skip to main content

demo/
bootstrap.rs

1//! Starts the demo-owned dashboard IPC and registration runtime.
2
3// Import the demo scenario state holder.
4use crate::scenario::DemoScenario;
5// Import validated configuration state.
6use rust_supervisor::config::state::ConfigState;
7// Import dashboard configuration validation.
8use rust_supervisor::dashboard::config::{
9    // Continue the demo expression.
10    ValidatedDashboardIpcConfig,
11    // Import the IPC config validator.
12    validate_dashboard_ipc_config,
13    // Continue the demo expression.
14};
15// Import dashboard errors.
16use rust_supervisor::dashboard::error::DashboardError;
17// Import dashboard socket binding helper.
18use rust_supervisor::dashboard::ipc_server::bind_dashboard_listener;
19// Import dashboard protocol contracts.
20use rust_supervisor::dashboard::protocol::{
21    // Continue the demo expression.
22    DASHBOARD_IPC_PROTOCOL_VERSION,
23    // Import the method parser.
24    IpcMethod,
25    // Import the request shape.
26    IpcRequest,
27    // Import the response shape.
28    IpcResponse,
29    // Import successful result shapes.
30    IpcResult,
31    // Continue the demo expression.
32    decode_command_params,
33    // Import request line parsing.
34    parse_request_line,
35    // Import response line serialization.
36    response_to_line,
37    // Continue the demo expression.
38};
39// Import dashboard registration helpers.
40use rust_supervisor::dashboard::registration::{
41    // Continue the demo expression.
42    build_registration_payload,
43    // Import heartbeat execution.
44    run_registration_heartbeat,
45    // Continue the demo expression.
46};
47// Import formatting support for guard diagnostics.
48use std::fmt;
49// Import path storage for socket cleanup.
50use std::path::PathBuf;
51// Import shared ownership for per-connection services.
52use std::sync::Arc;
53// Import asynchronous line I/O traits.
54use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
55// Import Unix socket types.
56use tokio::net::{UnixListener, UnixStream};
57// Import background task handles.
58use tokio::task::{JoinHandle, JoinSet};
59
60/// Demo dashboard runtime guard.
61pub(crate) struct DemoDashboardRuntimeGuard {
62    /// Socket path created by the demo runtime.
63    ipc_path: PathBuf,
64    /// Target-side IPC accept task.
65    ipc_task: JoinHandle<()>,
66    /// Optional registration heartbeat task.
67    heartbeat_task: Option<JoinHandle<()>>,
68    /// Target process identifier.
69    target_id: String,
70    /// Optional relay registration path.
71    registration_path: Option<PathBuf>,
72    // Continue the demo expression.
73}
74
75// Continue the demo expression.
76impl DemoDashboardRuntimeGuard {
77    /// Returns the target process identifier.
78    ///
79    /// # Arguments
80    ///
81    /// This function has no arguments.
82    ///
83    /// # Returns
84    ///
85    /// Returns the target identifier.
86    pub(crate) fn target_id(&self) -> &str {
87        // Return target identifier.
88        &self.target_id
89        // End target identifier access.
90    }
91
92    /// Returns the IPC socket path.
93    ///
94    /// # Arguments
95    ///
96    /// This function has no arguments.
97    ///
98    /// # Returns
99    ///
100    /// Returns the IPC path.
101    pub(crate) fn ipc_path(&self) -> &std::path::Path {
102        // Return IPC path.
103        &self.ipc_path
104        // End IPC path access.
105    }
106
107    /// Returns the registration socket path.
108    ///
109    /// # Arguments
110    ///
111    /// This function has no arguments.
112    ///
113    /// # Returns
114    ///
115    /// Returns the optional registration path.
116    pub(crate) fn registration_path(&self) -> Option<&std::path::Path> {
117        // Return optional registration path.
118        self.registration_path.as_deref()
119        // End registration path access.
120    }
121    // Continue the demo expression.
122}
123
124// Continue the demo expression.
125impl fmt::Debug for DemoDashboardRuntimeGuard {
126    /// Formats guard diagnostics without exposing task internals.
127    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
128        // Build a concise debug representation.
129        formatter
130            // Name the guard type.
131            .debug_struct("DemoDashboardRuntimeGuard")
132            // Include IPC path.
133            .field("ipc_path", &self.ipc_path)
134            // Include target identifier.
135            .field("target_id", &self.target_id)
136            // Include registration path.
137            .field("registration_path", &self.registration_path)
138            // Include heartbeat task presence.
139            .field("has_heartbeat_task", &self.heartbeat_task.is_some())
140            // Finish without exposing private task state.
141            .finish_non_exhaustive()
142        // End debug formatting.
143    }
144    // Continue the demo expression.
145}
146
147// Continue the demo expression.
148impl Drop for DemoDashboardRuntimeGuard {
149    /// Stops demo background tasks and removes the socket created by this runtime.
150    fn drop(&mut self) {
151        // Abort the IPC accept task.
152        self.ipc_task.abort();
153        // Abort the heartbeat task when present.
154        if let Some(task) = self.heartbeat_task.as_ref() {
155            // Abort registration heartbeat.
156            task.abort();
157            // End heartbeat branch.
158        }
159        // Remove the socket file owned by this process.
160        if let Err(error) = std::fs::remove_file(&self.ipc_path) {
161            // Ignore already-removed sockets.
162            if error.kind() != std::io::ErrorKind::NotFound {
163                // Print cleanup failure for the demo operator.
164                eprintln!(
165                    // Continue the demo expression.
166                    "failed to remove demo IPC socket {}: {error}",
167                    // Continue the demo expression.
168                    self.ipc_path.display() // Continue the demo expression.
169                                            // Finish cleanup warning output.
170                );
171                // End cleanup warning branch.
172            }
173            // End remove error branch.
174        }
175        // End runtime guard cleanup.
176    }
177    // Continue the demo expression.
178}
179
180/// Starts the demo dashboard runtime when IPC is enabled.
181///
182/// # Arguments
183///
184/// - `state`: Loaded supervisor configuration state.
185///
186/// # Returns
187///
188/// Returns a runtime guard when IPC is enabled.
189pub(crate) fn start_demo_dashboard_runtime(
190    // Continue the demo expression.
191    state: &ConfigState,
192    // Continue the demo expression.
193) -> Result<Option<DemoDashboardRuntimeGuard>, Box<dyn std::error::Error + Send + Sync>> {
194    // Validate the configured dashboard IPC section.
195    let Some(config) = validate_dashboard_ipc_config(state.ipc.as_ref())? else {
196        // Return no runtime when IPC is disabled.
197        return Ok(None);
198        // End disabled IPC branch.
199    };
200    // Bind the configured demo IPC socket.
201    let listener = bind_dashboard_listener(&config)?;
202    // Clone the IPC path for cleanup.
203    let ipc_path = config.path.clone();
204    // Clone the target identifier for summaries.
205    let target_id = config.target_id.clone();
206    // Clone the optional registration path for summaries.
207    let registration_path = config
208        // Borrow the optional registration config.
209        .registration
210        // Read the optional registration config.
211        .as_ref()
212        // Clone the configured relay path.
213        .map(|registration| registration.relay_registration_path.clone());
214    // Build the demo service.
215    let service = Arc::new(DemoIpcService::new(config.clone()));
216    // Start the IPC accept loop.
217    let ipc_task = tokio::spawn(run_accept_loop(
218        // Continue the demo expression.
219        listener,
220        // Continue the demo expression.
221        Arc::clone(&service),
222        // Continue the demo expression.
223        target_id.clone(),
224        // Continue the demo expression.
225    ));
226    // Start registration heartbeat when configured.
227    let heartbeat_task = start_heartbeat_task(config);
228    // Return the demo runtime guard.
229    Ok(Some(DemoDashboardRuntimeGuard {
230        // Store IPC path for cleanup.
231        ipc_path,
232        // Store IPC task.
233        ipc_task,
234        // Store optional heartbeat task.
235        heartbeat_task,
236        // Store target identifier.
237        target_id,
238        // Store optional registration path.
239        registration_path,
240        // End guard construction.
241    }))
242    // End runtime startup.
243}
244
245/// Starts the dynamic registration heartbeat when registration is enabled.
246///
247/// # Arguments
248///
249/// - `config`: Validated IPC configuration.
250///
251/// # Returns
252///
253/// Returns an optional task handle.
254fn start_heartbeat_task(config: ValidatedDashboardIpcConfig) -> Option<JoinHandle<()>> {
255    // Skip heartbeat when registration is absent.
256    config.registration.as_ref()?;
257    // Spawn the heartbeat loop.
258    Some(tokio::spawn(async move {
259        // Run registration heartbeat until it stops.
260        if let Err(error) = run_registration_heartbeat(config).await {
261            // Print non-retryable registration failure.
262            eprintln!("demo registration heartbeat stopped: {error}");
263            // End heartbeat error branch.
264        }
265        // End heartbeat task.
266    }))
267    // End heartbeat task startup.
268}
269
270/// Demo IPC request dispatcher.
271struct DemoIpcService {
272    /// Validated IPC configuration.
273    config: ValidatedDashboardIpcConfig,
274    /// Mutable dashboard scenario.
275    scenario: DemoScenario,
276    // Continue the demo expression.
277}
278
279// Continue the demo expression.
280impl DemoIpcService {
281    /// Creates the demo IPC service.
282    ///
283    /// # Arguments
284    ///
285    /// - `config`: Validated IPC configuration.
286    ///
287    /// # Returns
288    ///
289    /// Returns a demo service.
290    fn new(config: ValidatedDashboardIpcConfig) -> Self {
291        // Resolve the display name from registration config.
292        let display_name = config
293            // Borrow optional registration config.
294            .registration
295            // Read optional registration config.
296            .as_ref()
297            // Clone display name when present.
298            .map(|registration| registration.display_name.clone())
299            // Fall back to target identifier.
300            .unwrap_or_else(|| config.target_id.clone());
301        // Create the demo service.
302        Self {
303            // Store validated config.
304            config: config.clone(),
305            // Store mutable scenario.
306            scenario: DemoScenario::new(config.target_id.clone(), display_name),
307            // End service construction.
308        }
309        // End service construction.
310    }
311
312    /// Handles one parsed IPC request.
313    ///
314    /// # Arguments
315    ///
316    /// - `request`: Parsed IPC request.
317    ///
318    /// # Returns
319    ///
320    /// Returns an IPC response.
321    async fn handle_request(&self, request: IpcRequest) -> IpcResponse {
322        // Dispatch the request.
323        match self.dispatch(&request).await {
324            // Return success response.
325            Ok(result) => IpcResponse::ok(request.request_id, result),
326            // Return error response.
327            Err(error) => IpcResponse::error(request.request_id, error),
328            // End dispatch match.
329        }
330        // End request handling.
331    }
332
333    /// Dispatches one request by method.
334    ///
335    /// # Arguments
336    ///
337    /// - `request`: Parsed IPC request.
338    ///
339    /// # Returns
340    ///
341    /// Returns a typed IPC result.
342    async fn dispatch(&self, request: &IpcRequest) -> Result<IpcResult, DashboardError> {
343        // Parse the request method.
344        let method = IpcMethod::parse(&request.method)?;
345        // Dispatch by method.
346        match method {
347            // Return protocol hello response.
348            IpcMethod::Hello => Ok(IpcResult::Hello {
349                // Include protocol version.
350                protocol_version: DASHBOARD_IPC_PROTOCOL_VERSION.to_owned(),
351                // Include registration payload.
352                registration: build_registration_payload(&self.config)?,
353                // End hello payload.
354            }),
355            // Return the current demo state.
356            IpcMethod::CurrentState => {
357                // Build current scenario state.
358                let state = self.scenario.state();
359                // Return state payload.
360                Ok(IpcResult::State {
361                    // Include target identifier.
362                    target_id: state.target.target_id.clone(),
363                    // Include boxed dashboard state.
364                    state: Box::new(state),
365                    // End state payload.
366                })
367                // End current state branch.
368            }
369            // Accept event subscription.
370            IpcMethod::EventsSubscribe => Ok(self.subscription("events")),
371            // Accept log subscription.
372            IpcMethod::LogsTail => Ok(self.subscription("logs")),
373            // Dispatch control command methods.
374            IpcMethod::CommandRestartChild
375            // Continue the demo expression.
376            | IpcMethod::CommandPauseChild
377            // Continue the demo expression.
378            | IpcMethod::CommandResumeChild
379            // Continue the demo expression.
380            | IpcMethod::CommandQuarantineChild
381            // Continue the demo expression.
382            | IpcMethod::CommandRemoveChild
383            // Continue the demo expression.
384            | IpcMethod::CommandAddChild
385            // Continue the demo expression.
386            | IpcMethod::CommandShutdownTree => self.command_result(request),
387            // End method match.
388        }
389        // End dispatch.
390    }
391
392    /// Builds one subscription response.
393    ///
394    /// # Arguments
395    ///
396    /// - `subscription`: Subscription kind.
397    ///
398    /// # Returns
399    ///
400    /// Returns a subscription result.
401    fn subscription(&self, subscription: &str) -> IpcResult {
402        // Build the subscription payload.
403        IpcResult::Subscription {
404            // Include target identifier.
405            target_id: self.scenario.target_id().to_owned(),
406            // Include subscription kind.
407            subscription: subscription.to_owned(),
408            // End subscription payload.
409        }
410        // End subscription construction.
411    }
412
413    /// Handles one command request.
414    ///
415    /// # Arguments
416    ///
417    /// - `request`: IPC request.
418    ///
419    /// # Returns
420    ///
421    /// Returns a command result IPC payload.
422    fn command_result(&self, request: &IpcRequest) -> Result<IpcResult, DashboardError> {
423        // Decode command parameters.
424        let command = decode_command_params(request)?;
425        // Apply the command to the scenario.
426        let result = self.scenario.command_result(command)?;
427        // Return command result payload.
428        Ok(IpcResult::CommandResult {
429            // Include target identifier.
430            target_id: self.scenario.target_id().to_owned(),
431            // Include command result.
432            result,
433            // End command result payload.
434        })
435        // End command result handling.
436    }
437    // Continue the demo expression.
438}
439
440/// Accepts demo IPC connections until the task is aborted.
441///
442/// # Arguments
443///
444/// - `listener`: Bound Unix listener.
445/// - `service`: Shared demo service.
446/// - `target_id`: Target process identifier.
447///
448/// # Returns
449///
450/// This async task has no returned value.
451async fn run_accept_loop(listener: UnixListener, service: Arc<DemoIpcService>, target_id: String) {
452    // Track connection tasks.
453    let mut connections = JoinSet::new();
454    // Accept connections until listener failure.
455    loop {
456        // Wait for either a new connection or a completed task.
457        tokio::select! {
458            // Accept one socket connection.
459            accepted = listener.accept() => {
460                // Handle accept result.
461                match accepted {
462                    // Spawn a connection task.
463                    Ok((stream, _)) => {
464                        // Clone the shared service.
465                        let service = Arc::clone(&service);
466                        // Clone the target identifier.
467                        let target_id = target_id.clone();
468                        // Spawn the per-connection task.
469                        connections.spawn(async move {
470                            // Handle the socket connection.
471                            handle_connection(stream, service, target_id).await
472                            // End connection task.
473                        });
474                    // Continue the demo expression.
475                    }
476                    // Stop when accept fails.
477                    Err(error) => {
478                        // Print accept failure.
479                        eprintln!("demo IPC accept loop stopped: {error}");
480                        // Leave the accept loop.
481                        break;
482                    // Continue the demo expression.
483                    }
484                    // End accept match.
485                }
486            // Continue the demo expression.
487            }
488            // Collect completed connection tasks.
489            Some(joined) = connections.join_next() => {
490                // Report task failures.
491                if let Err(error) = joined {
492                    // Print task failure.
493                    eprintln!("demo IPC connection task failed: {error}");
494                    // End task error branch.
495                }
496            // Continue the demo expression.
497            }
498        // Continue the demo expression.
499        }
500        // Continue accept loop.
501    }
502    // End accept loop.
503}
504
505/// Handles one newline-delimited JSON IPC connection.
506///
507/// # Arguments
508///
509/// - `stream`: Accepted Unix socket.
510/// - `service`: Shared demo service.
511/// - `target_id`: Target process identifier.
512///
513/// # Returns
514///
515/// Returns success when the socket closes cleanly.
516async fn handle_connection(
517    // Continue the demo expression.
518    stream: UnixStream,
519    // Continue the demo expression.
520    service: Arc<DemoIpcService>,
521    // Continue the demo expression.
522    target_id: String,
523    // Continue the demo expression.
524) -> Result<(), DashboardError> {
525    // Wrap the stream in a line reader.
526    let mut reader = BufReader::new(stream);
527    // Read requests until EOF.
528    loop {
529        // Allocate the request line.
530        let mut line = String::new();
531        // Read one newline-delimited request.
532        let bytes = reader.read_line(&mut line).await.map_err(|error| {
533            // Build read error.
534            io_error(
535                // Continue the demo expression.
536                "ipc_read_failed",
537                // Continue the demo expression.
538                "ipc_read",
539                // Continue the demo expression.
540                Some(target_id.clone()),
541                // Continue the demo expression.
542                error,
543                // Continue the demo expression.
544            )
545            // End read error construction.
546        })?;
547        // Stop when the peer closes the socket.
548        if bytes == 0 {
549            // Return clean close.
550            return Ok(());
551            // End EOF branch.
552        }
553        // Convert the request line into a response.
554        let response = response_for_line(&service, line.trim_end()).await;
555        // Write the response to the socket.
556        write_response(&mut reader, &response, &target_id).await?;
557        // Continue reading requests.
558    }
559    // End connection handling.
560}
561
562/// Converts one request line into a response.
563///
564/// # Arguments
565///
566/// - `service`: Demo IPC service.
567/// - `line`: One request line.
568///
569/// # Returns
570///
571/// Returns an IPC response.
572async fn response_for_line(service: &DemoIpcService, line: &str) -> IpcResponse {
573    // Parse the line.
574    match parse_request_line(line) {
575        // Dispatch parsed requests.
576        Ok(request) => service.handle_request(request).await,
577        // Return protocol errors.
578        Err(error) => IpcResponse::error("invalid-request", error),
579        // End parse match.
580    }
581    // End response conversion.
582}
583
584/// Writes one response line to the socket.
585///
586/// # Arguments
587///
588/// - `reader`: Socket reader wrapper.
589/// - `response`: IPC response.
590/// - `target_id`: Target process identifier.
591///
592/// # Returns
593///
594/// Returns success after the response is written.
595async fn write_response(
596    // Continue the demo expression.
597    reader: &mut BufReader<UnixStream>,
598    // Continue the demo expression.
599    response: &IpcResponse,
600    // Continue the demo expression.
601    target_id: &str,
602    // Continue the demo expression.
603) -> Result<(), DashboardError> {
604    // Serialize the response as one line.
605    let line = response_to_line(response)?;
606    // Write the response line.
607    reader
608        // Access the underlying stream.
609        .get_mut()
610        // Write bytes to the peer.
611        .write_all(line.as_bytes())
612        // Await completion.
613        .await
614        // Convert I/O failure into dashboard error.
615        .map_err(|error| {
616            // Continue the demo expression.
617            io_error(
618                // Continue the demo expression.
619                "ipc_write_failed",
620                // Continue the demo expression.
621                "ipc_write",
622                // Continue the demo expression.
623                Some(target_id.to_owned()),
624                // Continue the demo expression.
625                error,
626                // Continue the demo expression.
627            )
628            // Continue the demo expression.
629        })
630    // End response write.
631}
632
633/// Creates a structured IPC runtime I/O error.
634///
635/// # Arguments
636///
637/// - `code`: Error code.
638/// - `stage`: Error stage.
639/// - `target_id`: Optional target process identifier.
640/// - `error`: Source I/O error.
641///
642/// # Returns
643///
644/// Returns a dashboard error.
645fn io_error(
646    // Continue the demo expression.
647    code: &str,
648    // Continue the demo expression.
649    stage: &str,
650    // Continue the demo expression.
651    target_id: Option<String>,
652    // Continue the demo expression.
653    error: std::io::Error,
654    // Continue the demo expression.
655) -> DashboardError {
656    // Create a retryable I/O error.
657    DashboardError::new(code, stage, target_id, error.to_string(), true)
658    // End I/O error construction.
659}