kapot_scheduler/scheduler_server/
external_scaler.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::scheduler_server::externalscaler::{
19    external_scaler_server::ExternalScaler, GetMetricSpecResponse, GetMetricsRequest,
20    GetMetricsResponse, IsActiveResponse, MetricSpec, MetricValue, ScaledObjectRef,
21};
22use crate::scheduler_server::SchedulerServer;
23use datafusion_proto::logical_plan::AsLogicalPlan;
24use datafusion_proto::physical_plan::AsExecutionPlan;
25
26use tonic::{Request, Response};
27
28const PENDING_JOBS_METRIC_NAME: &str = "pending_jobs";
29const RUNNING_JOBS_METRIC_NAME: &str = "running_jobs";
30
31#[tonic::async_trait]
32impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExternalScaler
33    for SchedulerServer<T, U>
34{
35    async fn is_active(
36        &self,
37        _request: Request<ScaledObjectRef>,
38    ) -> Result<Response<IsActiveResponse>, tonic::Status> {
39        Ok(Response::new(IsActiveResponse { result: true }))
40    }
41
42    async fn get_metric_spec(
43        &self,
44        _request: Request<ScaledObjectRef>,
45    ) -> Result<Response<GetMetricSpecResponse>, tonic::Status> {
46        Ok(Response::new(GetMetricSpecResponse {
47            metric_specs: vec![MetricSpec {
48                metric_name: PENDING_JOBS_METRIC_NAME.to_string(),
49                target_size: 0,
50            }],
51        }))
52    }
53
54    async fn get_metrics(
55        &self,
56        _request: Request<GetMetricsRequest>,
57    ) -> Result<Response<GetMetricsResponse>, tonic::Status> {
58        Ok(Response::new(GetMetricsResponse {
59            metric_values: vec![
60                MetricValue {
61                    metric_name: PENDING_JOBS_METRIC_NAME.to_string(),
62                    metric_value: self.pending_job_number() as i64,
63                },
64                MetricValue {
65                    metric_name: RUNNING_JOBS_METRIC_NAME.to_string(),
66                    metric_value: self.running_job_number() as i64,
67                },
68            ],
69        }))
70    }
71}