blueprint_qos/
service.rs

1use blueprint_core::info;
2use std::sync::Arc;
3use tonic::{Request, Response, Status, transport::Server};
4
5use crate::error::{Error, Result};
6use crate::metrics::MetricsProvider;
7use crate::proto::qos_metrics_server::{QosMetrics, QosMetricsServer};
8use crate::proto::{
9    BlueprintMetrics as ProtoBlueprintMetrics, GetBlueprintMetricsRequest,
10    GetBlueprintMetricsResponse, GetHistoricalMetricsRequest, GetHistoricalMetricsResponse,
11    GetResourceUsageRequest, GetResourceUsageResponse, GetStatusRequest, GetStatusResponse,
12    SystemMetrics as ProtoSystemMetrics,
13};
14
15/// Implementation of the `QoS` gRPC service for exposing metrics via a remote API.
16/// This service provides endpoints for retrieving blueprint status, resource usage,
17/// custom metrics, and historical data over a standardized protocol.
18#[derive(Debug)]
19pub struct QosMetricsService<T> {
20    provider: Arc<T>,
21}
22
23impl<T> QosMetricsService<T>
24where
25    T: MetricsProvider,
26{
27    /// Creates a new `QoS` metrics service with the provided metrics provider.
28    ///
29    /// The provider supplies the actual metric data that will be exposed through the gRPC service.
30    pub fn new(provider: Arc<T>) -> Self {
31        Self { provider }
32    }
33}
34
35#[tonic::async_trait]
36impl<T> QosMetrics for QosMetricsService<T>
37where
38    T: MetricsProvider + 'static,
39{
40    async fn get_status(
41        &self,
42        request: Request<GetStatusRequest>,
43    ) -> std::result::Result<Response<GetStatusResponse>, Status> {
44        let req = request.into_inner();
45        info!(
46            blueprint_id = req.blueprint_id,
47            service_id = req.service_id,
48            "Received GetStatus request"
49        );
50
51        let status = self.provider.get_blueprint_status().await;
52
53        if status.blueprint_id != req.blueprint_id || status.service_id != req.service_id {
54            return Err(Status::not_found("Blueprint or service ID not found"));
55        }
56
57        let response = GetStatusResponse {
58            status_code: status.status_code,
59            status_message: status.status_message,
60            uptime: status.uptime,
61            start_time: status.start_time,
62            last_heartbeat: status.last_heartbeat,
63            timestamp: status.timestamp,
64            service_id: status.service_id,
65            blueprint_id: status.blueprint_id,
66        };
67
68        Ok(Response::new(response))
69    }
70
71    async fn get_resource_usage(
72        &self,
73        request: Request<GetResourceUsageRequest>,
74    ) -> std::result::Result<Response<GetResourceUsageResponse>, Status> {
75        let req = request.into_inner();
76        info!(
77            blueprint_id = req.blueprint_id,
78            service_id = req.service_id,
79            "Received GetResourceUsage request"
80        );
81
82        let status = self.provider.get_blueprint_status().await;
83
84        if status.blueprint_id != req.blueprint_id || status.service_id != req.service_id {
85            return Err(Status::not_found("Blueprint or service ID not found"));
86        }
87
88        let metrics = self.provider.get_system_metrics().await;
89
90        let response = GetResourceUsageResponse {
91            cpu_usage: metrics.cpu_usage,
92            memory_usage: metrics.memory_usage,
93            total_memory: metrics.total_memory,
94            disk_usage: metrics.disk_usage,
95            total_disk: metrics.total_disk,
96            network_rx_bytes: metrics.network_rx_bytes,
97            network_tx_bytes: metrics.network_tx_bytes,
98            timestamp: metrics.timestamp,
99        };
100
101        Ok(Response::new(response))
102    }
103
104    async fn get_blueprint_metrics(
105        &self,
106        request: Request<GetBlueprintMetricsRequest>,
107    ) -> std::result::Result<Response<GetBlueprintMetricsResponse>, Status> {
108        let req = request.into_inner();
109        info!(
110            blueprint_id = req.blueprint_id,
111            service_id = req.service_id,
112            "Received GetBlueprintMetrics request"
113        );
114
115        let status = self.provider.get_blueprint_status().await;
116
117        if status.blueprint_id != req.blueprint_id || status.service_id != req.service_id {
118            return Err(Status::not_found("Blueprint or service ID not found"));
119        }
120
121        let metrics = self.provider.get_blueprint_metrics().await;
122
123        let response = GetBlueprintMetricsResponse {
124            custom_metrics: metrics.custom_metrics,
125            timestamp: metrics.timestamp,
126        };
127
128        Ok(Response::new(response))
129    }
130
131    async fn get_historical_metrics(
132        &self,
133        request: Request<GetHistoricalMetricsRequest>,
134    ) -> std::result::Result<Response<GetHistoricalMetricsResponse>, Status> {
135        let req = request.into_inner();
136        info!(
137            blueprint_id = req.blueprint_id,
138            service_id = req.service_id,
139            "Received GetHistoricalMetrics request"
140        );
141
142        let status = self.provider.get_blueprint_status().await;
143
144        if status.blueprint_id != req.blueprint_id || status.service_id != req.service_id {
145            return Err(Status::not_found("Blueprint or service ID not found"));
146        }
147
148        let system_metrics_history = if req.metrics_type == 0 {
149            self.provider
150                .get_system_metrics_history()
151                .await
152                .into_iter()
153                .map(|m| ProtoSystemMetrics {
154                    cpu_usage: m.cpu_usage,
155                    memory_usage: m.memory_usage,
156                    total_memory: m.total_memory,
157                    disk_usage: m.disk_usage,
158                    total_disk: m.total_disk,
159                    network_rx_bytes: m.network_rx_bytes,
160                    network_tx_bytes: m.network_tx_bytes,
161                    timestamp: m.timestamp,
162                })
163                .collect()
164        } else {
165            Vec::new()
166        };
167
168        let blueprint_metrics_history = if req.metrics_type == 1 {
169            self.provider
170                .get_blueprint_metrics_history()
171                .await
172                .into_iter()
173                .map(|m| ProtoBlueprintMetrics {
174                    custom_metrics: m.custom_metrics,
175                    timestamp: m.timestamp,
176                })
177                .collect()
178        } else {
179            Vec::new()
180        };
181
182        let response = GetHistoricalMetricsResponse {
183            system_metrics: system_metrics_history,
184            blueprint_metrics: blueprint_metrics_history,
185        };
186
187        Ok(Response::new(response))
188    }
189}
190
191/// Starts a gRPC server that exposes `QoS` metrics on the specified address.
192///
193/// This function binds to the provided address and serves the `QosMetrics` gRPC service
194/// using the supplied metrics provider as the data source. It runs indefinitely until
195/// the server is shut down or encounters an error.
196///
197/// # Errors
198/// Returns an error if the server fails to bind to the address, fails to start,
199/// or encounters an error during operation.
200pub async fn run_qos_server<T>(bind_address: String, provider: Arc<T>) -> Result<()>
201where
202    T: MetricsProvider + 'static,
203{
204    let addr = bind_address
205        .parse()
206        .map_err(|e| Error::Other(format!("Failed to parse bind address: {}", e)))?;
207
208    info!("QoS metrics server listening on {}", addr);
209
210    let service = QosMetricsService::new(provider);
211    let server = QosMetricsServer::new(service);
212
213    Server::builder()
214        .add_service(server)
215        .serve(addr)
216        .await
217        .map_err(Error::Grpc)?;
218
219    Ok(())
220}