demo/bootstrap.rs
1//! Starts the demo-owned dashboard IPC and registration runtime.
2
3// Import the demo scenario state holder.
4use crate::scenario::DemoScenario;
5// Import validated configuration state.
6use rust_supervisor::config::state::ConfigState;
7// Import dashboard configuration validation.
8use rust_supervisor::dashboard::config::{
9 // Continue the demo expression.
10 ValidatedDashboardIpcConfig,
11 // Import the IPC config validator.
12 validate_dashboard_ipc_config,
13 // Continue the demo expression.
14};
15// Import dashboard errors.
16use rust_supervisor::dashboard::error::DashboardError;
17// Import dashboard socket binding helper.
18use rust_supervisor::dashboard::ipc_server::bind_dashboard_listener;
19// Import dashboard protocol contracts.
20use rust_supervisor::dashboard::protocol::{
21 // Continue the demo expression.
22 DASHBOARD_IPC_PROTOCOL_VERSION,
23 // Import the method parser.
24 IpcMethod,
25 // Import the request shape.
26 IpcRequest,
27 // Import the response shape.
28 IpcResponse,
29 // Import successful result shapes.
30 IpcResult,
31 // Continue the demo expression.
32 decode_command_params,
33 // Import request line parsing.
34 parse_request_line,
35 // Import response line serialization.
36 response_to_line,
37 // Continue the demo expression.
38};
39// Import dashboard registration helpers.
40use rust_supervisor::dashboard::registration::{
41 // Continue the demo expression.
42 build_registration_payload,
43 // Import heartbeat execution.
44 run_registration_heartbeat,
45 // Continue the demo expression.
46};
47// Import formatting support for guard diagnostics.
48use std::fmt;
49// Import path storage for socket cleanup.
50use std::path::PathBuf;
51// Import shared ownership for per-connection services.
52use std::sync::Arc;
53// Import asynchronous line I/O traits.
54use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
55// Import Unix socket types.
56use tokio::net::{UnixListener, UnixStream};
57// Import background task handles.
58use tokio::task::{JoinHandle, JoinSet};
59
60/// Demo dashboard runtime guard.
61pub(crate) struct DemoDashboardRuntimeGuard {
62 /// Socket path created by the demo runtime.
63 ipc_path: PathBuf,
64 /// Target-side IPC accept task.
65 ipc_task: JoinHandle<()>,
66 /// Optional registration heartbeat task.
67 heartbeat_task: Option<JoinHandle<()>>,
68 /// Target process identifier.
69 target_id: String,
70 /// Optional relay registration path.
71 registration_path: Option<PathBuf>,
72 // Continue the demo expression.
73}
74
75// Continue the demo expression.
76impl DemoDashboardRuntimeGuard {
77 /// Returns the target process identifier.
78 ///
79 /// # Arguments
80 ///
81 /// This function has no arguments.
82 ///
83 /// # Returns
84 ///
85 /// Returns the target identifier.
86 pub(crate) fn target_id(&self) -> &str {
87 // Return target identifier.
88 &self.target_id
89 // End target identifier access.
90 }
91
92 /// Returns the IPC socket path.
93 ///
94 /// # Arguments
95 ///
96 /// This function has no arguments.
97 ///
98 /// # Returns
99 ///
100 /// Returns the IPC path.
101 pub(crate) fn ipc_path(&self) -> &std::path::Path {
102 // Return IPC path.
103 &self.ipc_path
104 // End IPC path access.
105 }
106
107 /// Returns the registration socket path.
108 ///
109 /// # Arguments
110 ///
111 /// This function has no arguments.
112 ///
113 /// # Returns
114 ///
115 /// Returns the optional registration path.
116 pub(crate) fn registration_path(&self) -> Option<&std::path::Path> {
117 // Return optional registration path.
118 self.registration_path.as_deref()
119 // End registration path access.
120 }
121 // Continue the demo expression.
122}
123
124// Continue the demo expression.
125impl fmt::Debug for DemoDashboardRuntimeGuard {
126 /// Formats guard diagnostics without exposing task internals.
127 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
128 // Build a concise debug representation.
129 formatter
130 // Name the guard type.
131 .debug_struct("DemoDashboardRuntimeGuard")
132 // Include IPC path.
133 .field("ipc_path", &self.ipc_path)
134 // Include target identifier.
135 .field("target_id", &self.target_id)
136 // Include registration path.
137 .field("registration_path", &self.registration_path)
138 // Include heartbeat task presence.
139 .field("has_heartbeat_task", &self.heartbeat_task.is_some())
140 // Finish without exposing private task state.
141 .finish_non_exhaustive()
142 // End debug formatting.
143 }
144 // Continue the demo expression.
145}
146
147// Continue the demo expression.
148impl Drop for DemoDashboardRuntimeGuard {
149 /// Stops demo background tasks and removes the socket created by this runtime.
150 fn drop(&mut self) {
151 // Abort the IPC accept task.
152 self.ipc_task.abort();
153 // Abort the heartbeat task when present.
154 if let Some(task) = self.heartbeat_task.as_ref() {
155 // Abort registration heartbeat.
156 task.abort();
157 // End heartbeat branch.
158 }
159 // Remove the socket file owned by this process.
160 if let Err(error) = std::fs::remove_file(&self.ipc_path) {
161 // Ignore already-removed sockets.
162 if error.kind() != std::io::ErrorKind::NotFound {
163 // Print cleanup failure for the demo operator.
164 eprintln!(
165 // Continue the demo expression.
166 "failed to remove demo IPC socket {}: {error}",
167 // Continue the demo expression.
168 self.ipc_path.display() // Continue the demo expression.
169 // Finish cleanup warning output.
170 );
171 // End cleanup warning branch.
172 }
173 // End remove error branch.
174 }
175 // End runtime guard cleanup.
176 }
177 // Continue the demo expression.
178}
179
180/// Starts the demo dashboard runtime when IPC is enabled.
181///
182/// # Arguments
183///
184/// - `state`: Loaded supervisor configuration state.
185///
186/// # Returns
187///
188/// Returns a runtime guard when IPC is enabled.
189pub(crate) fn start_demo_dashboard_runtime(
190 // Continue the demo expression.
191 state: &ConfigState,
192 // Continue the demo expression.
193) -> Result<Option<DemoDashboardRuntimeGuard>, Box<dyn std::error::Error + Send + Sync>> {
194 // Validate the configured dashboard IPC section.
195 let Some(config) = validate_dashboard_ipc_config(state.ipc.as_ref())? else {
196 // Return no runtime when IPC is disabled.
197 return Ok(None);
198 // End disabled IPC branch.
199 };
200 // Bind the configured demo IPC socket.
201 let listener = bind_dashboard_listener(&config)?;
202 // Clone the IPC path for cleanup.
203 let ipc_path = config.path.clone();
204 // Clone the target identifier for summaries.
205 let target_id = config.target_id.clone();
206 // Clone the optional registration path for summaries.
207 let registration_path = config
208 // Borrow the optional registration config.
209 .registration
210 // Read the optional registration config.
211 .as_ref()
212 // Clone the configured relay path.
213 .map(|registration| registration.relay_registration_path.clone());
214 // Build the demo service.
215 let service = Arc::new(DemoIpcService::new(config.clone()));
216 // Start the IPC accept loop.
217 let ipc_task = tokio::spawn(run_accept_loop(
218 // Continue the demo expression.
219 listener,
220 // Continue the demo expression.
221 Arc::clone(&service),
222 // Continue the demo expression.
223 target_id.clone(),
224 // Continue the demo expression.
225 ));
226 // Start registration heartbeat when configured.
227 let heartbeat_task = start_heartbeat_task(config);
228 // Return the demo runtime guard.
229 Ok(Some(DemoDashboardRuntimeGuard {
230 // Store IPC path for cleanup.
231 ipc_path,
232 // Store IPC task.
233 ipc_task,
234 // Store optional heartbeat task.
235 heartbeat_task,
236 // Store target identifier.
237 target_id,
238 // Store optional registration path.
239 registration_path,
240 // End guard construction.
241 }))
242 // End runtime startup.
243}
244
245/// Starts the dynamic registration heartbeat when registration is enabled.
246///
247/// # Arguments
248///
249/// - `config`: Validated IPC configuration.
250///
251/// # Returns
252///
253/// Returns an optional task handle.
254fn start_heartbeat_task(config: ValidatedDashboardIpcConfig) -> Option<JoinHandle<()>> {
255 // Skip heartbeat when registration is absent.
256 config.registration.as_ref()?;
257 // Spawn the heartbeat loop.
258 Some(tokio::spawn(async move {
259 // Run registration heartbeat until it stops.
260 if let Err(error) = run_registration_heartbeat(config).await {
261 // Print non-retryable registration failure.
262 eprintln!("demo registration heartbeat stopped: {error}");
263 // End heartbeat error branch.
264 }
265 // End heartbeat task.
266 }))
267 // End heartbeat task startup.
268}
269
270/// Demo IPC request dispatcher.
271struct DemoIpcService {
272 /// Validated IPC configuration.
273 config: ValidatedDashboardIpcConfig,
274 /// Mutable dashboard scenario.
275 scenario: DemoScenario,
276 // Continue the demo expression.
277}
278
279// Continue the demo expression.
280impl DemoIpcService {
281 /// Creates the demo IPC service.
282 ///
283 /// # Arguments
284 ///
285 /// - `config`: Validated IPC configuration.
286 ///
287 /// # Returns
288 ///
289 /// Returns a demo service.
290 fn new(config: ValidatedDashboardIpcConfig) -> Self {
291 // Resolve the display name from registration config.
292 let display_name = config
293 // Borrow optional registration config.
294 .registration
295 // Read optional registration config.
296 .as_ref()
297 // Clone display name when present.
298 .map(|registration| registration.display_name.clone())
299 // Fall back to target identifier.
300 .unwrap_or_else(|| config.target_id.clone());
301 // Create the demo service.
302 Self {
303 // Store validated config.
304 config: config.clone(),
305 // Store mutable scenario.
306 scenario: DemoScenario::new(config.target_id.clone(), display_name),
307 // End service construction.
308 }
309 // End service construction.
310 }
311
312 /// Handles one parsed IPC request.
313 ///
314 /// # Arguments
315 ///
316 /// - `request`: Parsed IPC request.
317 ///
318 /// # Returns
319 ///
320 /// Returns an IPC response.
321 async fn handle_request(&self, request: IpcRequest) -> IpcResponse {
322 // Dispatch the request.
323 match self.dispatch(&request).await {
324 // Return success response.
325 Ok(result) => IpcResponse::ok(request.request_id, result),
326 // Return error response.
327 Err(error) => IpcResponse::error(request.request_id, error),
328 // End dispatch match.
329 }
330 // End request handling.
331 }
332
333 /// Dispatches one request by method.
334 ///
335 /// # Arguments
336 ///
337 /// - `request`: Parsed IPC request.
338 ///
339 /// # Returns
340 ///
341 /// Returns a typed IPC result.
342 async fn dispatch(&self, request: &IpcRequest) -> Result<IpcResult, DashboardError> {
343 // Parse the request method.
344 let method = IpcMethod::parse(&request.method)?;
345 // Dispatch by method.
346 match method {
347 // Return protocol hello response.
348 IpcMethod::Hello => Ok(IpcResult::Hello {
349 // Include protocol version.
350 protocol_version: DASHBOARD_IPC_PROTOCOL_VERSION.to_owned(),
351 // Include registration payload.
352 registration: build_registration_payload(&self.config)?,
353 // End hello payload.
354 }),
355 // Return the current demo state.
356 IpcMethod::CurrentState => {
357 // Build current scenario state.
358 let state = self.scenario.state();
359 // Return state payload.
360 Ok(IpcResult::State {
361 // Include target identifier.
362 target_id: state.target.target_id.clone(),
363 // Include boxed dashboard state.
364 state: Box::new(state),
365 // End state payload.
366 })
367 // End current state branch.
368 }
369 // Accept event subscription.
370 IpcMethod::EventsSubscribe => Ok(self.subscription("events")),
371 // Accept log subscription.
372 IpcMethod::LogsTail => Ok(self.subscription("logs")),
373 // Dispatch control command methods.
374 IpcMethod::CommandRestartChild
375 // Continue the demo expression.
376 | IpcMethod::CommandPauseChild
377 // Continue the demo expression.
378 | IpcMethod::CommandResumeChild
379 // Continue the demo expression.
380 | IpcMethod::CommandQuarantineChild
381 // Continue the demo expression.
382 | IpcMethod::CommandRemoveChild
383 // Continue the demo expression.
384 | IpcMethod::CommandAddChild
385 // Continue the demo expression.
386 | IpcMethod::CommandShutdownTree => self.command_result(request),
387 // End method match.
388 }
389 // End dispatch.
390 }
391
392 /// Builds one subscription response.
393 ///
394 /// # Arguments
395 ///
396 /// - `subscription`: Subscription kind.
397 ///
398 /// # Returns
399 ///
400 /// Returns a subscription result.
401 fn subscription(&self, subscription: &str) -> IpcResult {
402 // Build the subscription payload.
403 IpcResult::Subscription {
404 // Include target identifier.
405 target_id: self.scenario.target_id().to_owned(),
406 // Include subscription kind.
407 subscription: subscription.to_owned(),
408 // End subscription payload.
409 }
410 // End subscription construction.
411 }
412
413 /// Handles one command request.
414 ///
415 /// # Arguments
416 ///
417 /// - `request`: IPC request.
418 ///
419 /// # Returns
420 ///
421 /// Returns a command result IPC payload.
422 fn command_result(&self, request: &IpcRequest) -> Result<IpcResult, DashboardError> {
423 // Decode command parameters.
424 let command = decode_command_params(request)?;
425 // Apply the command to the scenario.
426 let result = self.scenario.command_result(command)?;
427 // Return command result payload.
428 Ok(IpcResult::CommandResult {
429 // Include target identifier.
430 target_id: self.scenario.target_id().to_owned(),
431 // Include command result.
432 result,
433 // End command result payload.
434 })
435 // End command result handling.
436 }
437 // Continue the demo expression.
438}
439
440/// Accepts demo IPC connections until the task is aborted.
441///
442/// # Arguments
443///
444/// - `listener`: Bound Unix listener.
445/// - `service`: Shared demo service.
446/// - `target_id`: Target process identifier.
447///
448/// # Returns
449///
450/// This async task has no returned value.
451async fn run_accept_loop(listener: UnixListener, service: Arc<DemoIpcService>, target_id: String) {
452 // Track connection tasks.
453 let mut connections = JoinSet::new();
454 // Accept connections until listener failure.
455 loop {
456 // Wait for either a new connection or a completed task.
457 tokio::select! {
458 // Accept one socket connection.
459 accepted = listener.accept() => {
460 // Handle accept result.
461 match accepted {
462 // Spawn a connection task.
463 Ok((stream, _)) => {
464 // Clone the shared service.
465 let service = Arc::clone(&service);
466 // Clone the target identifier.
467 let target_id = target_id.clone();
468 // Spawn the per-connection task.
469 connections.spawn(async move {
470 // Handle the socket connection.
471 handle_connection(stream, service, target_id).await
472 // End connection task.
473 });
474 // Continue the demo expression.
475 }
476 // Stop when accept fails.
477 Err(error) => {
478 // Print accept failure.
479 eprintln!("demo IPC accept loop stopped: {error}");
480 // Leave the accept loop.
481 break;
482 // Continue the demo expression.
483 }
484 // End accept match.
485 }
486 // Continue the demo expression.
487 }
488 // Collect completed connection tasks.
489 Some(joined) = connections.join_next() => {
490 // Report task failures.
491 if let Err(error) = joined {
492 // Print task failure.
493 eprintln!("demo IPC connection task failed: {error}");
494 // End task error branch.
495 }
496 // Continue the demo expression.
497 }
498 // Continue the demo expression.
499 }
500 // Continue accept loop.
501 }
502 // End accept loop.
503}
504
505/// Handles one newline-delimited JSON IPC connection.
506///
507/// # Arguments
508///
509/// - `stream`: Accepted Unix socket.
510/// - `service`: Shared demo service.
511/// - `target_id`: Target process identifier.
512///
513/// # Returns
514///
515/// Returns success when the socket closes cleanly.
516async fn handle_connection(
517 // Continue the demo expression.
518 stream: UnixStream,
519 // Continue the demo expression.
520 service: Arc<DemoIpcService>,
521 // Continue the demo expression.
522 target_id: String,
523 // Continue the demo expression.
524) -> Result<(), DashboardError> {
525 // Wrap the stream in a line reader.
526 let mut reader = BufReader::new(stream);
527 // Read requests until EOF.
528 loop {
529 // Allocate the request line.
530 let mut line = String::new();
531 // Read one newline-delimited request.
532 let bytes = reader.read_line(&mut line).await.map_err(|error| {
533 // Build read error.
534 io_error(
535 // Continue the demo expression.
536 "ipc_read_failed",
537 // Continue the demo expression.
538 "ipc_read",
539 // Continue the demo expression.
540 Some(target_id.clone()),
541 // Continue the demo expression.
542 error,
543 // Continue the demo expression.
544 )
545 // End read error construction.
546 })?;
547 // Stop when the peer closes the socket.
548 if bytes == 0 {
549 // Return clean close.
550 return Ok(());
551 // End EOF branch.
552 }
553 // Convert the request line into a response.
554 let response = response_for_line(&service, line.trim_end()).await;
555 // Write the response to the socket.
556 write_response(&mut reader, &response, &target_id).await?;
557 // Continue reading requests.
558 }
559 // End connection handling.
560}
561
562/// Converts one request line into a response.
563///
564/// # Arguments
565///
566/// - `service`: Demo IPC service.
567/// - `line`: One request line.
568///
569/// # Returns
570///
571/// Returns an IPC response.
572async fn response_for_line(service: &DemoIpcService, line: &str) -> IpcResponse {
573 // Parse the line.
574 match parse_request_line(line) {
575 // Dispatch parsed requests.
576 Ok(request) => service.handle_request(request).await,
577 // Return protocol errors.
578 Err(error) => IpcResponse::error("invalid-request", error),
579 // End parse match.
580 }
581 // End response conversion.
582}
583
584/// Writes one response line to the socket.
585///
586/// # Arguments
587///
588/// - `reader`: Socket reader wrapper.
589/// - `response`: IPC response.
590/// - `target_id`: Target process identifier.
591///
592/// # Returns
593///
594/// Returns success after the response is written.
595async fn write_response(
596 // Continue the demo expression.
597 reader: &mut BufReader<UnixStream>,
598 // Continue the demo expression.
599 response: &IpcResponse,
600 // Continue the demo expression.
601 target_id: &str,
602 // Continue the demo expression.
603) -> Result<(), DashboardError> {
604 // Serialize the response as one line.
605 let line = response_to_line(response)?;
606 // Write the response line.
607 reader
608 // Access the underlying stream.
609 .get_mut()
610 // Write bytes to the peer.
611 .write_all(line.as_bytes())
612 // Await completion.
613 .await
614 // Convert I/O failure into dashboard error.
615 .map_err(|error| {
616 // Continue the demo expression.
617 io_error(
618 // Continue the demo expression.
619 "ipc_write_failed",
620 // Continue the demo expression.
621 "ipc_write",
622 // Continue the demo expression.
623 Some(target_id.to_owned()),
624 // Continue the demo expression.
625 error,
626 // Continue the demo expression.
627 )
628 // Continue the demo expression.
629 })
630 // End response write.
631}
632
633/// Creates a structured IPC runtime I/O error.
634///
635/// # Arguments
636///
637/// - `code`: Error code.
638/// - `stage`: Error stage.
639/// - `target_id`: Optional target process identifier.
640/// - `error`: Source I/O error.
641///
642/// # Returns
643///
644/// Returns a dashboard error.
645fn io_error(
646 // Continue the demo expression.
647 code: &str,
648 // Continue the demo expression.
649 stage: &str,
650 // Continue the demo expression.
651 target_id: Option<String>,
652 // Continue the demo expression.
653 error: std::io::Error,
654 // Continue the demo expression.
655) -> DashboardError {
656 // Create a retryable I/O error.
657 DashboardError::new(code, stage, target_id, error.to_string(), true)
658 // End I/O error construction.
659}