1use crate::control::command::CommandResult;
8use crate::control::handle::SupervisorHandle;
9use crate::dashboard::config::ValidatedDashboardIpcConfig;
10use crate::dashboard::error::DashboardError;
11use crate::dashboard::model::{
12 ControlCommandKind, ControlCommandRequest, ControlCommandResult, DashboardCurrentState,
13 DashboardState, TargetProcessRegistration, dashboard_command_result_value,
14 runtime_state_from_child_runtime_record,
15};
16use crate::dashboard::protocol::{
17 DASHBOARD_IPC_PROTOCOL_VERSION, IpcMethod, IpcRequest, IpcResponse, IpcResult,
18 decode_command_params,
19};
20use crate::dashboard::registration::build_registration_payload;
21use crate::dashboard::state::{DashboardStateInput, build_dashboard_state};
22use crate::id::types::{ChildId, SupervisorPath};
23use crate::ipc::security::peer_identity::PeerIdentity;
24use crate::ipc::security::{CheckOutcome, IpcSecurityPipeline};
25use crate::journal::ring::EventJournal;
26use crate::spec::supervisor::SupervisorSpec;
27use crate::state::supervisor::SupervisorState;
28use std::os::unix::fs::FileTypeExt;
29use std::os::unix::net::UnixStream as StdUnixStream;
30use std::sync::{Arc, Mutex};
31use tokio::net::UnixListener;
32
33pub struct DashboardIpcService {
35 config: ValidatedDashboardIpcConfig,
37 spec: SupervisorSpec,
39 state: SupervisorState,
41 journal: EventJournal,
43 handle: Option<SupervisorHandle>,
45 state_generation: u64,
47 security_pipeline: Option<Arc<Mutex<IpcSecurityPipeline>>>,
49}
50
51impl DashboardIpcService {
52 pub fn new(
65 config: ValidatedDashboardIpcConfig,
66 spec: SupervisorSpec,
67 state: SupervisorState,
68 journal: EventJournal,
69 ) -> Self {
70 Self {
71 config,
72 spec,
73 state,
74 journal,
75 handle: None,
76 state_generation: 1,
77 security_pipeline: None,
78 }
79 }
80
81 pub fn with_handle(mut self, handle: SupervisorHandle) -> Self {
91 self.handle = Some(handle);
92 self
93 }
94
95 pub fn with_security_pipeline(mut self, pipeline: IpcSecurityPipeline) -> Self {
105 self.security_pipeline = Some(Arc::new(Mutex::new(pipeline)));
106 self
107 }
108
109 pub fn registration_payload(&self) -> Result<TargetProcessRegistration, DashboardError> {
119 build_registration_payload(&self.config)
120 }
121
122 pub async fn handle_request(
140 &self,
141 request: IpcRequest,
142 peer: &PeerIdentity,
143 connection_id: &str,
144 raw_body_len: usize,
145 ) -> IpcResponse {
146 let method = request.method.clone();
147 let request_id = request.request_id.clone();
148 let is_high_risk = is_high_risk_command(&method);
149
150 if let Some(ref pipeline) = self.security_pipeline {
151 let mut guard = pipeline.lock().unwrap();
152
153 match guard.check(&method, &request_id, raw_body_len, peer, connection_id) {
154 CheckOutcome::Denied(err) => {
155 let err_code = err.code.clone();
156 self.audit_or_fail(
158 &mut guard,
159 &method,
160 peer,
161 false,
162 Some(&err),
163 &err_code,
164 is_high_risk,
165 &request_id,
166 );
167 return IpcResponse::error(request.request_id.clone(), err);
168 }
169 CheckOutcome::Passed => {}
170 }
171
172 if let Some(cached_json) = guard.check_idempotency(&request_id) {
174 let method = method.clone();
175 let peer_clone = peer.clone();
176 drop(guard);
177 if let Some(ref pipeline) = self.security_pipeline {
179 let mut guard = pipeline.lock().unwrap();
180 self.audit_or_fail(
181 &mut guard,
182 &method,
183 &peer_clone,
184 true,
185 None,
186 "c8_idempotency_cache_hit",
187 is_high_risk,
188 &request_id,
189 );
190 }
191 return serde_json::from_str(&cached_json).unwrap_or_else(|_| {
193 IpcResponse::error(
194 request_id,
195 DashboardError::new(
196 "idempotency_cache_corrupted",
197 "c8_idempotency",
198 Some(self.config.target_id.clone()),
199 "cached response failed to deserialize".to_owned(),
200 false,
201 ),
202 )
203 });
204 }
205 drop(guard);
206 }
207
208 let dispatch_result = self.dispatch(&request).await;
210 let response = match &dispatch_result {
211 Ok(result) => IpcResponse::ok(request.request_id.clone(), result.clone()),
212 Err(error) => IpcResponse::error(request.request_id.clone(), error.clone()),
213 };
214
215 if let Some(ref pipeline) = self.security_pipeline {
217 let mut guard = pipeline.lock().unwrap();
218
219 if let Ok(response_json) = serde_json::to_string(&response) {
221 guard.cache_result(&request_id, &response_json);
222 }
223
224 let (allowed, denial_error, denial_code): (bool, Option<&DashboardError>, &str) =
226 match &dispatch_result {
227 Ok(_) => (true, None, "dispatch_ok"),
228 Err(err) => (false, Some(err), err.code.as_str()),
229 };
230 self.audit_or_fail(
231 &mut guard,
232 &method,
233 peer,
234 allowed,
235 denial_error,
236 denial_code,
237 is_high_risk,
238 &request_id,
239 );
240 }
241
242 response
243 }
244
245 #[allow(clippy::too_many_arguments)]
248 fn audit_or_fail(
249 &self,
250 guard: &mut std::sync::MutexGuard<'_, IpcSecurityPipeline>,
251 method: &str,
252 peer: &PeerIdentity,
253 allowed: bool,
254 denial_error: Option<&DashboardError>,
255 denial_code: &str,
256 is_high_risk: bool,
257 request_id: &str,
258 ) {
259 if let Err(_err) = guard.write_audit(method, peer, allowed, denial_error, denial_code) {
260 let _count = crate::ipc::security::audit::alerts::increment_failure_count();
261 tracing::error!(
262 target: "rust_supervisor::ipc::security::audit",
263 %method,
264 high_risk = is_high_risk,
265 "audit write failed"
266 );
267 if is_high_risk {
268 tracing::error!(
276 target: "rust_supervisor::ipc::security::audit",
277 %method,
278 %request_id,
279 "HIGH-RISK command denied because audit write failed (fail-closed)"
280 );
281 }
282 }
283 }
284
285 async fn dispatch(&self, request: &IpcRequest) -> Result<IpcResult, DashboardError> {
295 let method = IpcMethod::parse(&request.method)?;
296 match method {
297 IpcMethod::Hello => Ok(IpcResult::Hello {
298 protocol_version: DASHBOARD_IPC_PROTOCOL_VERSION.to_owned(),
299 registration: self.registration_payload()?,
300 }),
301 IpcMethod::CurrentState => {
302 let state = self.current_dashboard_state().await?;
303 Ok(IpcResult::State {
304 target_id: state.target.target_id.clone(),
305 state: Box::new(state),
306 })
307 }
308 IpcMethod::EventsSubscribe => {
309 require_session_trigger(request, &self.config.target_id)?;
310 Ok(IpcResult::Subscription {
311 target_id: self.config.target_id.clone(),
312 subscription: "events".to_owned(),
313 })
314 }
315 IpcMethod::LogsTail => {
316 require_session_trigger(request, &self.config.target_id)?;
317 Ok(IpcResult::Subscription {
318 target_id: self.config.target_id.clone(),
319 subscription: "logs".to_owned(),
320 })
321 }
322 IpcMethod::CommandRestartChild
323 | IpcMethod::CommandPauseChild
324 | IpcMethod::CommandResumeChild
325 | IpcMethod::CommandQuarantineChild
326 | IpcMethod::CommandRemoveChild
327 | IpcMethod::CommandAddChild
328 | IpcMethod::CommandShutdownTree => self.command_result(request).await,
329 }
330 }
331
332 pub async fn current_dashboard_state(&self) -> Result<DashboardState, DashboardError> {
342 let registration = self.registration_payload().ok();
343 let mut state = build_dashboard_state(
344 DashboardStateInput {
345 target_id: self.config.target_id.clone(),
346 display_name: registration
347 .as_ref()
348 .map(|registration| registration.display_name.clone())
349 .unwrap_or_else(|| self.config.target_id.clone()),
350 state_generation: self.state_generation,
351 recent_limit: 128,
352 },
353 &self.spec,
354 &self.state,
355 &self.journal,
356 );
357 if let Some(handle) = self.handle.as_ref() {
358 let result = handle.current_state().await.map_err(|error| {
359 DashboardError::new(
360 "current_state_failed",
361 "state",
362 Some(self.config.target_id.clone()),
363 error.to_string(),
364 true,
365 )
366 })?;
367 if let CommandResult::CurrentState {
368 state: runtime_state,
369 } = result
370 {
371 let dashboard_state = DashboardCurrentState::from_current_state(&runtime_state);
372 state.runtime_state = runtime_state
374 .child_runtime_records
375 .iter()
376 .map(|record| {
377 runtime_state_from_child_runtime_record(
378 record,
379 runtime_state.shutdown_completed,
380 )
381 })
382 .collect();
383 state.child_runtime_records = dashboard_state.child_runtime_records;
384 }
385 }
386 Ok(state)
387 }
388
389 async fn command_result(&self, request: &IpcRequest) -> Result<IpcResult, DashboardError> {
399 let command = decode_command_params(request)?;
400 validate_command(&command)?;
401 if command.target_id != self.config.target_id {
402 return Err(DashboardError::validation(
403 "command_validate",
404 Some(self.config.target_id.clone()),
405 "command target_id must match target process",
406 ));
407 }
408 let result = if let Some(handle) = self.handle.as_ref() {
409 execute_command(handle, &command).await
410 } else {
411 Err(DashboardError::target_unavailable(
412 "command_dispatch",
413 command.target_id.clone(),
414 "runtime control handle is not attached",
415 ))
416 };
417 let result = match result {
418 Ok(result) => {
419 let state_delta = dashboard_command_result_value(&result).map_err(|error| {
420 DashboardError::new(
421 "command_result_model_failed",
422 "command_dispatch",
423 Some(command.target_id.clone()),
424 format!("failed to map command result: {error}"),
425 false,
426 )
427 })?;
428 ControlCommandResult {
429 command_id: command.command_id.clone(),
430 target_id: command.target_id.clone(),
431 accepted: true,
432 status: "completed".to_owned(),
433 error: None,
434 state_delta: Some(state_delta),
435 completed_at_unix_nanos: Some(unix_nanos_now()),
436 }
437 }
438 Err(error) => ControlCommandResult {
439 command_id: command.command_id.clone(),
440 target_id: command.target_id.clone(),
441 accepted: false,
442 status: "failed".to_owned(),
443 error: Some(error),
444 state_delta: None,
445 completed_at_unix_nanos: Some(unix_nanos_now()),
446 },
447 };
448 Ok(IpcResult::CommandResult {
449 target_id: command.target_id,
450 result,
451 })
452 }
453}
454
455pub fn bind_dashboard_listener(
465 config: &ValidatedDashboardIpcConfig,
466) -> Result<UnixListener, DashboardError> {
467 prepare_socket_path(config)?;
468 if let Some(parent) = config.path.parent() {
470 std::fs::create_dir_all(parent).map_err(|error| {
471 DashboardError::new(
472 "ipc_parent_dir_creation_failed",
473 "ipc_bind",
474 Some(config.target_id.clone()),
475 format!("failed to create IPC parent directory: {error}"),
476 false,
477 )
478 })?;
479 }
480 UnixListener::bind(&config.path).map_err(|error| {
481 DashboardError::new(
482 "ipc_bind_failed",
483 "ipc_bind",
484 Some(config.target_id.clone()),
485 format!("failed to bind target IPC socket: {error}"),
486 true,
487 )
488 })
489}
490
491fn prepare_socket_path(config: &ValidatedDashboardIpcConfig) -> Result<(), DashboardError> {
501 let metadata = match std::fs::symlink_metadata(&config.path) {
502 Ok(metadata) => metadata,
503 Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(()),
504 Err(error) => {
505 return Err(DashboardError::new(
506 "ipc_path_metadata_failed",
507 "ipc_bind",
508 Some(config.target_id.clone()),
509 format!("failed to inspect IPC path: {error}"),
510 false,
511 ));
512 }
513 };
514 match config.bind_mode {
515 crate::config::configurable::DashboardIpcBindMode::CreateNew => {
516 Err(DashboardError::validation(
517 "ipc_bind",
518 Some(config.target_id.clone()),
519 "IPC path already exists and bind_mode is create_new",
520 ))
521 }
522 crate::config::configurable::DashboardIpcBindMode::ReplaceStale => {
523 if metadata.file_type().is_symlink() {
524 return Err(DashboardError::validation(
525 "ipc_bind",
526 Some(config.target_id.clone()),
527 "IPC path must not be a symlink",
528 ));
529 }
530 if !metadata.file_type().is_socket() {
531 return Err(DashboardError::validation(
532 "ipc_bind",
533 Some(config.target_id.clone()),
534 "IPC path must be a Unix socket before stale replacement",
535 ));
536 }
537 if StdUnixStream::connect(&config.path).is_ok() {
538 return Err(DashboardError::validation(
539 "ipc_bind",
540 Some(config.target_id.clone()),
541 "IPC path is served by a live process",
542 ));
543 }
544 crate::ipc::security::peer_identity::prepare_socket_path_for_bind(&config.path)?;
546 std::fs::remove_file(&config.path).map_err(|error| {
547 DashboardError::new(
548 "ipc_stale_remove_failed",
549 "ipc_bind",
550 Some(config.target_id.clone()),
551 format!("failed to remove stale IPC path: {error}"),
552 true,
553 )
554 })
555 }
556 }
557}
558
559fn require_session_trigger(request: &IpcRequest, target_id: &str) -> Result<(), DashboardError> {
570 let established = request
571 .params
572 .get("session_established")
573 .and_then(serde_json::Value::as_bool)
574 .unwrap_or(false);
575 if established {
576 Ok(())
577 } else {
578 Err(DashboardError::new(
579 "session_required",
580 "subscription",
581 Some(target_id.to_owned()),
582 "event and log subscription must be triggered by an established dashboard session",
583 false,
584 ))
585 }
586}
587
588pub fn validate_command(command: &ControlCommandRequest) -> Result<(), DashboardError> {
598 if command.reason.trim().is_empty() {
599 return Err(DashboardError::validation(
600 "command_validate",
601 Some(command.target_id.clone()),
602 "command reason must not be empty",
603 ));
604 }
605 if command.requested_by.trim().is_empty() {
606 return Err(DashboardError::validation(
607 "command_validate",
608 Some(command.target_id.clone()),
609 "requested_by must be derived by relay",
610 ));
611 }
612 if matches!(
613 command.command,
614 ControlCommandKind::ShutdownTree
615 | ControlCommandKind::RemoveChild
616 | ControlCommandKind::AddChild
617 ) && !command.confirmed
618 {
619 return Err(DashboardError::validation(
620 "command_validate",
621 Some(command.target_id.clone()),
622 "dangerous command requires confirmation",
623 ));
624 }
625 Ok(())
626}
627
628async fn execute_command(
639 handle: &SupervisorHandle,
640 command: &ControlCommandRequest,
641) -> Result<CommandResult, DashboardError> {
642 let result = match command.command {
643 ControlCommandKind::RestartChild => {
644 handle
645 .restart_child(child_id(command)?, &command.requested_by, &command.reason)
646 .await
647 }
648 ControlCommandKind::PauseChild => {
649 handle
650 .pause_child(child_id(command)?, &command.requested_by, &command.reason)
651 .await
652 }
653 ControlCommandKind::ResumeChild => {
654 handle
655 .resume_child(child_id(command)?, &command.requested_by, &command.reason)
656 .await
657 }
658 ControlCommandKind::QuarantineChild => {
659 handle
660 .quarantine_child(child_id(command)?, &command.requested_by, &command.reason)
661 .await
662 }
663 ControlCommandKind::RemoveChild => {
664 handle
665 .remove_child(child_id(command)?, &command.requested_by, &command.reason)
666 .await
667 }
668 ControlCommandKind::AddChild => {
669 handle
670 .add_child(
671 SupervisorPath::root(),
672 command.target.child_manifest.clone().unwrap_or_default(),
673 &command.requested_by,
674 &command.reason,
675 )
676 .await
677 }
678 ControlCommandKind::ShutdownTree => {
679 handle
680 .shutdown_tree(&command.requested_by, &command.reason)
681 .await
682 }
683 };
684 result.map_err(|error| {
685 DashboardError::new(
686 "command_failed",
687 "command_dispatch",
688 Some(command.target_id.clone()),
689 error.to_string(),
690 true,
691 )
692 })
693}
694
695fn child_id(command: &ControlCommandRequest) -> Result<ChildId, DashboardError> {
705 let child_path = command.target.child_path.as_deref().ok_or_else(|| {
706 DashboardError::validation(
707 "command_validate",
708 Some(command.target_id.clone()),
709 "child_path is required for child command",
710 )
711 })?;
712 let value = child_path
713 .rsplit('/')
714 .find(|segment| !segment.is_empty())
715 .unwrap_or(child_path);
716 Ok(ChildId::new(value))
717}
718
719fn unix_nanos_now() -> u128 {
729 std::time::SystemTime::now()
730 .duration_since(std::time::UNIX_EPOCH)
731 .unwrap_or(std::time::Duration::ZERO)
732 .as_nanos()
733}
734
735fn is_high_risk_command(method: &str) -> bool {
738 matches!(
739 method,
740 "command.restart_child"
741 | "command.quarantine_child"
742 | "command.remove_child"
743 | "command.shutdown_tree"
744 | "command.add_child"
745 )
746}