1use crate::identity::rejection::{RejectionInfo, RejectionReason, TargetRegion};
16use crate::identity::restart::RestartManager;
17use crate::network::P2PEvent;
18use serde::{Deserialize, Serialize};
19use std::sync::Arc;
20use tokio::sync::broadcast;
21use tracing::{debug, info};
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct RejectionMessage {
26 pub reason: RejectionReason,
28 pub message: String,
30 pub suggested_target: Option<TargetRegion>,
32}
33
34pub struct ControlMessageHandler {
36 restart_manager: Arc<RestartManager>,
37}
38
39impl ControlMessageHandler {
40 pub fn new(restart_manager: Arc<RestartManager>) -> Self {
42 Self { restart_manager }
43 }
44
45 pub async fn start(self: Arc<Self>, mut events: broadcast::Receiver<P2PEvent>) {
47 tokio::spawn(async move {
48 info!("Control message handler started");
49
50 while let Ok(event) = events.recv().await {
51 if let P2PEvent::Message {
52 topic,
53 source,
54 data,
55 } = event
56 && topic == "control"
57 {
58 self.handle_control_message(&source, &data).await;
59 }
60 }
61
62 info!("Control message handler stopped");
63 });
64 }
65
66 async fn handle_control_message(&self, source: &str, data: &[u8]) {
68 if let Ok(rejection) = serde_json::from_slice::<RejectionMessage>(data) {
70 info!(
71 "Received rejection from {}: {} ({:?})",
72 source, rejection.message, rejection.reason
73 );
74
75 let info = RejectionInfo::new(rejection.reason)
77 .with_message(rejection.message)
78 .with_rejecting_node(source);
79
80 let info = if let Some(target) = rejection.suggested_target {
82 info.with_suggested_target(target)
83 } else {
84 info
85 };
86
87 self.restart_manager.handle_rejection(info).await;
89 } else {
90 debug!("Received unknown control message from {}", source);
91 }
92 }
93}