saorsa_core/
control.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: david@saorsalabs.com
9
10//! Control module for network-level control messages and handling.
11//!
12//! This module defines messages used for network control, such as connection
13//! rejection notifications, and provides handlers for processing them.
14
15use crate::identity::rejection::{RejectionInfo, RejectionReason, TargetRegion};
16use crate::identity::restart::RestartManager;
17use crate::network::P2PEvent;
18use serde::{Deserialize, Serialize};
19use std::sync::Arc;
20use tokio::sync::broadcast;
21use tracing::{debug, info};
22
23/// Message sent to a peer when their connection is rejected.
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct RejectionMessage {
26    /// The reason for rejection.
27    pub reason: RejectionReason,
28    /// Optional message explaining the rejection.
29    pub message: String,
30    /// Optional suggestion for where to connect instead.
31    pub suggested_target: Option<TargetRegion>,
32}
33
34/// Handler for control messages.
35pub struct ControlMessageHandler {
36    restart_manager: Arc<RestartManager>,
37}
38
39impl ControlMessageHandler {
40    /// Create a new control message handler.
41    pub fn new(restart_manager: Arc<RestartManager>) -> Self {
42        Self { restart_manager }
43    }
44
45    /// Start listening for control messages.
46    pub async fn start(self: Arc<Self>, mut events: broadcast::Receiver<P2PEvent>) {
47        tokio::spawn(async move {
48            info!("Control message handler started");
49
50            while let Ok(event) = events.recv().await {
51                if let P2PEvent::Message {
52                    topic,
53                    source,
54                    data,
55                } = event
56                    && topic == "control"
57                {
58                    self.handle_control_message(&source, &data).await;
59                }
60            }
61
62            info!("Control message handler stopped");
63        });
64    }
65
66    /// Handle a received control message.
67    async fn handle_control_message(&self, source: &str, data: &[u8]) {
68        // Try to deserialize as RejectionMessage
69        if let Ok(rejection) = serde_json::from_slice::<RejectionMessage>(data) {
70            info!(
71                "Received rejection from {}: {} ({:?})",
72                source, rejection.message, rejection.reason
73            );
74
75            // Convert to RejectionInfo
76            let info = RejectionInfo::new(rejection.reason)
77                .with_message(rejection.message)
78                .with_rejecting_node(source);
79
80            // If suggested target is present, add it
81            let info = if let Some(target) = rejection.suggested_target {
82                info.with_suggested_target(target)
83            } else {
84                info
85            };
86
87            // Trigger restart manager
88            self.restart_manager.handle_rejection(info);
89        } else {
90            debug!("Received unknown control message from {}", source);
91        }
92    }
93}