kapot_scheduler/scheduler_server/
external_scaler.rs1use crate::scheduler_server::externalscaler::{
19 external_scaler_server::ExternalScaler, GetMetricSpecResponse, GetMetricsRequest,
20 GetMetricsResponse, IsActiveResponse, MetricSpec, MetricValue, ScaledObjectRef,
21};
22use crate::scheduler_server::SchedulerServer;
23use datafusion_proto::logical_plan::AsLogicalPlan;
24use datafusion_proto::physical_plan::AsExecutionPlan;
25
26use tonic::{Request, Response};
27
28const PENDING_JOBS_METRIC_NAME: &str = "pending_jobs";
29const RUNNING_JOBS_METRIC_NAME: &str = "running_jobs";
30
31#[tonic::async_trait]
32impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExternalScaler
33 for SchedulerServer<T, U>
34{
35 async fn is_active(
36 &self,
37 _request: Request<ScaledObjectRef>,
38 ) -> Result<Response<IsActiveResponse>, tonic::Status> {
39 Ok(Response::new(IsActiveResponse { result: true }))
40 }
41
42 async fn get_metric_spec(
43 &self,
44 _request: Request<ScaledObjectRef>,
45 ) -> Result<Response<GetMetricSpecResponse>, tonic::Status> {
46 Ok(Response::new(GetMetricSpecResponse {
47 metric_specs: vec![MetricSpec {
48 metric_name: PENDING_JOBS_METRIC_NAME.to_string(),
49 target_size: 0,
50 }],
51 }))
52 }
53
54 async fn get_metrics(
55 &self,
56 _request: Request<GetMetricsRequest>,
57 ) -> Result<Response<GetMetricsResponse>, tonic::Status> {
58 Ok(Response::new(GetMetricsResponse {
59 metric_values: vec![
60 MetricValue {
61 metric_name: PENDING_JOBS_METRIC_NAME.to_string(),
62 metric_value: self.pending_job_number() as i64,
63 },
64 MetricValue {
65 metric_name: RUNNING_JOBS_METRIC_NAME.to_string(),
66 metric_value: self.running_job_number() as i64,
67 },
68 ],
69 }))
70 }
71}