use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use crate::state::backend::{Keyspace, Operation, StateBackendClient, WatchEvent};
use crate::state::{decode_into, decode_protobuf, encode_protobuf, with_lock};
use ballista_core::error::{BallistaError, Result};
use ballista_core::serde::protobuf;
use crate::config::SlotsPolicy;
use crate::state::execution_graph::RunningTaskInfo;
use ballista_core::serde::protobuf::executor_grpc_client::ExecutorGrpcClient;
use ballista_core::serde::protobuf::{
executor_status, CancelTasksParams, ExecutorHeartbeat, ExecutorStatus,
RemoveJobDataParams,
};
use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata};
use ballista_core::utils::create_grpc_client_connection;
use dashmap::{DashMap, DashSet};
use futures::StreamExt;
use log::{debug, error, info, warn};
use parking_lot::Mutex;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tonic::transport::Channel;
type ExecutorClients = Arc<DashMap<String, ExecutorGrpcClient<Channel>>>;
#[derive(Clone, Debug)]
pub struct ExecutorReservation {
pub executor_id: String,
pub job_id: Option<String>,
}
impl ExecutorReservation {
pub fn new_free(executor_id: String) -> Self {
Self {
executor_id,
job_id: None,
}
}
pub fn new_assigned(executor_id: String, job_id: String) -> Self {
Self {
executor_id,
job_id: Some(job_id),
}
}
pub fn assign(mut self, job_id: String) -> Self {
self.job_id = Some(job_id);
self
}
pub fn assigned(&self) -> bool {
self.job_id.is_some()
}
}
pub const DEFAULT_EXECUTOR_TIMEOUT_SECONDS: u64 = 180;
#[derive(Clone)]
pub(crate) struct ExecutorManager {
slots_policy: SlotsPolicy,
state: Arc<dyn StateBackendClient>,
executor_metadata: Arc<DashMap<String, ExecutorMetadata>>,
executors_heartbeat: Arc<DashMap<String, protobuf::ExecutorHeartbeat>>,
executor_data: Arc<Mutex<HashMap<String, ExecutorData>>>,
dead_executors: Arc<DashSet<String>>,
clients: ExecutorClients,
}
impl ExecutorManager {
pub(crate) fn new(
state: Arc<dyn StateBackendClient>,
slots_policy: SlotsPolicy,
) -> Self {
Self {
slots_policy,
state,
executor_metadata: Arc::new(DashMap::new()),
executors_heartbeat: Arc::new(DashMap::new()),
executor_data: Arc::new(Mutex::new(HashMap::new())),
dead_executors: Arc::new(DashSet::new()),
clients: Default::default(),
}
}
pub async fn init(&self) -> Result<()> {
self.init_active_executor_heartbeats().await?;
let heartbeat_listener = ExecutorHeartbeatListener::new(
self.state.clone(),
self.executors_heartbeat.clone(),
self.dead_executors.clone(),
);
heartbeat_listener.start().await
}
pub async fn reserve_slots(&self, n: u32) -> Result<Vec<ExecutorReservation>> {
if self.slots_policy.is_local() {
self.reserve_slots_local(n).await
} else {
self.reserve_slots_global(n).await
}
}
async fn reserve_slots_local(&self, n: u32) -> Result<Vec<ExecutorReservation>> {
debug!("Attempting to reserve {} executor slots", n);
let alive_executors = self.get_alive_executors_within_one_minute();
match self.slots_policy {
SlotsPolicy::RoundRobinLocal => {
self.reserve_slots_local_round_robin(n, alive_executors)
.await
}
_ => Err(BallistaError::General(format!(
"Reservation policy {:?} is not supported",
self.slots_policy
))),
}
}
async fn reserve_slots_local_round_robin(
&self,
mut n: u32,
alive_executors: HashSet<String>,
) -> Result<Vec<ExecutorReservation>> {
let mut executor_data = self.executor_data.lock();
let mut available_executor_data: Vec<&mut ExecutorData> = executor_data
.values_mut()
.filter_map(|data| {
(data.available_task_slots > 0
&& alive_executors.contains(&data.executor_id))
.then_some(data)
})
.collect();
available_executor_data
.sort_by(|a, b| Ord::cmp(&b.available_task_slots, &a.available_task_slots));
let mut reservations: Vec<ExecutorReservation> = vec![];
let mut last_updated_idx = 0usize;
loop {
let n_before = n;
for (idx, data) in available_executor_data.iter_mut().enumerate() {
if n == 0 {
break;
}
if data.available_task_slots == 0 {
break;
}
reservations
.push(ExecutorReservation::new_free(data.executor_id.clone()));
data.available_task_slots -= 1;
n -= 1;
if idx >= last_updated_idx {
last_updated_idx = idx + 1;
}
}
if n_before == n {
break;
}
}
Ok(reservations)
}
async fn reserve_slots_global(&self, n: u32) -> Result<Vec<ExecutorReservation>> {
let lock = self.state.lock(Keyspace::Slots, "global").await?;
with_lock(lock, async {
debug!("Attempting to reserve {} executor slots", n);
let start = Instant::now();
let alive_executors = self.get_alive_executors_within_one_minute();
let (reservations, txn_ops) = match self.slots_policy {
SlotsPolicy::Bias => {
self.reserve_slots_global_bias(n, alive_executors).await?
}
SlotsPolicy::RoundRobin => {
self.reserve_slots_global_round_robin(n, alive_executors)
.await?
}
_ => {
return Err(BallistaError::General(format!(
"Reservation policy {:?} is not supported",
self.slots_policy
)))
}
};
self.state.apply_txn(txn_ops).await?;
let elapsed = start.elapsed();
info!(
"Reserved {} executor slots in {:?}",
reservations.len(),
elapsed
);
Ok(reservations)
})
.await
}
async fn reserve_slots_global_bias(
&self,
mut n: u32,
alive_executors: HashSet<String>,
) -> Result<(Vec<ExecutorReservation>, Vec<(Operation, Keyspace, String)>)> {
let mut reservations: Vec<ExecutorReservation> = vec![];
let mut txn_ops: Vec<(Operation, Keyspace, String)> = vec![];
for executor_id in alive_executors {
if n == 0 {
break;
}
let value = self.state.get(Keyspace::Slots, &executor_id).await?;
let mut data = decode_into::<protobuf::ExecutorData, ExecutorData>(&value)?;
let take = std::cmp::min(data.available_task_slots, n);
for _ in 0..take {
reservations.push(ExecutorReservation::new_free(executor_id.clone()));
data.available_task_slots -= 1;
n -= 1;
}
let proto: protobuf::ExecutorData = data.into();
let new_data = encode_protobuf(&proto)?;
txn_ops.push((Operation::Put(new_data), Keyspace::Slots, executor_id));
}
Ok((reservations, txn_ops))
}
async fn reserve_slots_global_round_robin(
&self,
mut n: u32,
alive_executors: HashSet<String>,
) -> Result<(Vec<ExecutorReservation>, Vec<(Operation, Keyspace, String)>)> {
let mut reservations: Vec<ExecutorReservation> = vec![];
let mut txn_ops: Vec<(Operation, Keyspace, String)> = vec![];
let all_executor_data = self
.state
.scan(Keyspace::Slots, None)
.await?
.into_iter()
.map(|(_, data)| decode_into::<protobuf::ExecutorData, ExecutorData>(&data))
.collect::<Result<Vec<ExecutorData>>>()?;
let mut available_executor_data: Vec<ExecutorData> = all_executor_data
.into_iter()
.filter_map(|data| {
(data.available_task_slots > 0
&& alive_executors.contains(&data.executor_id))
.then_some(data)
})
.collect();
available_executor_data
.sort_by(|a, b| Ord::cmp(&b.available_task_slots, &a.available_task_slots));
let mut last_updated_idx = 0usize;
loop {
let n_before = n;
for (idx, data) in available_executor_data.iter_mut().enumerate() {
if n == 0 {
break;
}
if data.available_task_slots == 0 {
break;
}
reservations
.push(ExecutorReservation::new_free(data.executor_id.clone()));
data.available_task_slots -= 1;
n -= 1;
if idx >= last_updated_idx {
last_updated_idx = idx + 1;
}
}
if n_before == n {
break;
}
}
for (idx, data) in available_executor_data.into_iter().enumerate() {
if idx >= last_updated_idx {
break;
}
let executor_id = data.executor_id.clone();
let proto: protobuf::ExecutorData = data.into();
let new_data = encode_protobuf(&proto)?;
txn_ops.push((Operation::Put(new_data), Keyspace::Slots, executor_id));
}
Ok((reservations, txn_ops))
}
pub async fn cancel_reservations(
&self,
reservations: Vec<ExecutorReservation>,
) -> Result<()> {
if self.slots_policy.is_local() {
self.cancel_reservations_local(reservations).await
} else {
self.cancel_reservations_global(reservations).await
}
}
async fn cancel_reservations_local(
&self,
reservations: Vec<ExecutorReservation>,
) -> Result<()> {
let mut executor_slots: HashMap<String, u32> = HashMap::new();
for reservation in reservations {
if let Some(slots) = executor_slots.get_mut(&reservation.executor_id) {
*slots += 1;
} else {
executor_slots.insert(reservation.executor_id, 1);
}
}
let mut executor_data = self.executor_data.lock();
for (id, released_slots) in executor_slots.into_iter() {
if let Some(slots) = executor_data.get_mut(&id) {
slots.available_task_slots += released_slots;
} else {
warn!("ExecutorData for {} is not cached in memory", id);
}
}
Ok(())
}
async fn cancel_reservations_global(
&self,
reservations: Vec<ExecutorReservation>,
) -> Result<()> {
let lock = self.state.lock(Keyspace::Slots, "global").await?;
with_lock(lock, async {
let num_reservations = reservations.len();
debug!("Cancelling {} reservations", num_reservations);
let start = Instant::now();
let mut executor_slots: HashMap<String, ExecutorData> = HashMap::new();
for reservation in reservations {
let executor_id = &reservation.executor_id;
if let Some(data) = executor_slots.get_mut(executor_id) {
data.available_task_slots += 1;
} else {
let value = self.state.get(Keyspace::Slots, executor_id).await?;
let mut data =
decode_into::<protobuf::ExecutorData, ExecutorData>(&value)?;
data.available_task_slots += 1;
executor_slots.insert(executor_id.clone(), data);
}
}
let txn_ops: Vec<(Operation, Keyspace, String)> = executor_slots
.into_iter()
.map(|(executor_id, data)| {
let proto: protobuf::ExecutorData = data.into();
let new_data = encode_protobuf(&proto)?;
Ok((Operation::Put(new_data), Keyspace::Slots, executor_id))
})
.collect::<Result<Vec<_>>>()?;
self.state.apply_txn(txn_ops).await?;
let elapsed = start.elapsed();
info!(
"Cancelled {} reservations in {:?}",
num_reservations, elapsed
);
Ok(())
})
.await
}
pub async fn cancel_running_tasks(&self, tasks: Vec<RunningTaskInfo>) -> Result<()> {
let mut tasks_to_cancel: HashMap<&str, Vec<protobuf::RunningTaskInfo>> =
Default::default();
for task_info in &tasks {
if let Some(infos) = tasks_to_cancel.get_mut(task_info.executor_id.as_str()) {
infos.push(protobuf::RunningTaskInfo {
task_id: task_info.task_id as u32,
job_id: task_info.job_id.clone(),
stage_id: task_info.stage_id as u32,
partition_id: task_info.partition_id as u32,
})
} else {
tasks_to_cancel.insert(
task_info.executor_id.as_str(),
vec![protobuf::RunningTaskInfo {
task_id: task_info.task_id as u32,
job_id: task_info.job_id.clone(),
stage_id: task_info.stage_id as u32,
partition_id: task_info.partition_id as u32,
}],
);
}
}
for (executor_id, infos) in tasks_to_cancel {
if let Ok(mut client) = self.get_client(executor_id).await {
client
.cancel_tasks(CancelTasksParams { task_infos: infos })
.await?;
} else {
error!(
"Failed to get client for executor ID {} to cancel tasks",
executor_id
)
}
}
Ok(())
}
pub(crate) fn clean_up_job_data_delayed(
&self,
job_id: String,
clean_up_interval: u64,
) {
if clean_up_interval == 0 {
info!(
"The interval is 0 and the clean up for job data {} will not triggered",
job_id
);
return;
}
let executor_manager = self.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(clean_up_interval)).await;
executor_manager.clean_up_job_data_inner(job_id).await;
});
}
pub fn clean_up_job_data(&self, job_id: String) {
let executor_manager = self.clone();
tokio::spawn(async move {
executor_manager.clean_up_job_data_inner(job_id).await;
});
}
async fn clean_up_job_data_inner(&self, job_id: String) {
let alive_executors = self.get_alive_executors_within_one_minute();
for executor in alive_executors {
let job_id_clone = job_id.to_owned();
if let Ok(mut client) = self.get_client(&executor).await {
tokio::spawn(async move {
if let Err(err) = client
.remove_job_data(RemoveJobDataParams {
job_id: job_id_clone,
})
.await
{
warn!(
"Failed to call remove_job_data on Executor {} due to {:?}",
executor, err
)
}
});
} else {
warn!("Failed to get client for Executor {}", executor)
}
}
}
pub async fn get_client(
&self,
executor_id: &str,
) -> Result<ExecutorGrpcClient<Channel>> {
let client = self.clients.get(executor_id).map(|value| value.clone());
if let Some(client) = client {
Ok(client)
} else {
let executor_metadata = self.get_executor_metadata(executor_id).await?;
let executor_url = format!(
"http://{}:{}",
executor_metadata.host, executor_metadata.grpc_port
);
let connection = create_grpc_client_connection(executor_url).await?;
let client = ExecutorGrpcClient::new(connection);
{
self.clients.insert(executor_id.to_owned(), client.clone());
}
Ok(client)
}
}
pub async fn get_executor_state(&self) -> Result<Vec<(ExecutorMetadata, Duration)>> {
let heartbeat_timestamps: Vec<(String, u64)> = {
self.executors_heartbeat
.iter()
.map(|item| {
let (executor_id, heartbeat) = item.pair();
(executor_id.clone(), heartbeat.timestamp)
})
.collect()
};
let mut state: Vec<(ExecutorMetadata, Duration)> = vec![];
for (executor_id, ts) in heartbeat_timestamps {
let duration = Duration::from_secs(ts);
let metadata = self.get_executor_metadata(&executor_id).await?;
state.push((metadata, duration));
}
Ok(state)
}
pub async fn get_executor_metadata(
&self,
executor_id: &str,
) -> Result<ExecutorMetadata> {
{
if let Some(cached) = self.executor_metadata.get(executor_id) {
return Ok(cached.clone());
}
}
let value = self.state.get(Keyspace::Executors, executor_id).await?;
let decoded =
decode_into::<protobuf::ExecutorMetadata, ExecutorMetadata>(&value)?;
Ok(decoded)
}
pub async fn save_executor_metadata(&self, metadata: ExecutorMetadata) -> Result<()> {
let executor_id = metadata.id.clone();
let proto: protobuf::ExecutorMetadata = metadata.into();
let value = encode_protobuf(&proto)?;
self.state
.put(Keyspace::Executors, executor_id, value)
.await
}
pub async fn register_executor(
&self,
metadata: ExecutorMetadata,
specification: ExecutorData,
reserve: bool,
) -> Result<Vec<ExecutorReservation>> {
self.test_scheduler_connectivity(&metadata).await?;
let executor_id = metadata.id.clone();
let current_ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(|e| {
BallistaError::Internal(format!(
"Error getting current timestamp: {:?}",
e
))
})?
.as_secs();
self.save_executor_metadata(metadata).await?;
self.save_executor_heartbeat(protobuf::ExecutorHeartbeat {
executor_id: executor_id.clone(),
timestamp: current_ts,
metrics: vec![],
status: Some(ExecutorStatus {
status: Some(executor_status::Status::Active("".to_string())),
}),
})
.await?;
if !reserve {
if self.slots_policy.is_local() {
let mut executor_data = self.executor_data.lock();
executor_data
.insert(specification.executor_id.clone(), specification.clone());
}
let proto: protobuf::ExecutorData = specification.into();
let value = encode_protobuf(&proto)?;
self.state.put(Keyspace::Slots, executor_id, value).await?;
Ok(vec![])
} else {
let mut specification = specification;
let num_slots = specification.available_task_slots as usize;
let mut reservations: Vec<ExecutorReservation> = vec![];
for _ in 0..num_slots {
reservations.push(ExecutorReservation::new_free(executor_id.clone()));
}
specification.available_task_slots = 0;
if self.slots_policy.is_local() {
let mut executor_data = self.executor_data.lock();
executor_data
.insert(specification.executor_id.clone(), specification.clone());
}
let proto: protobuf::ExecutorData = specification.into();
let value = encode_protobuf(&proto)?;
self.state.put(Keyspace::Slots, executor_id, value).await?;
Ok(reservations)
}
}
pub async fn remove_executor(
&self,
executor_id: &str,
_reason: Option<String>,
) -> Result<()> {
let current_ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(|e| {
BallistaError::Internal(format!(
"Error getting current timestamp: {:?}",
e
))
})?
.as_secs();
self.save_dead_executor_heartbeat(protobuf::ExecutorHeartbeat {
executor_id: executor_id.to_owned(),
timestamp: current_ts,
metrics: vec![],
status: Some(ExecutorStatus {
status: Some(executor_status::Status::Dead("".to_string())),
}),
})
.await?;
Ok(())
}
#[cfg(not(test))]
async fn test_scheduler_connectivity(
&self,
metadata: &ExecutorMetadata,
) -> Result<()> {
let executor_url = format!("http://{}:{}", metadata.host, metadata.grpc_port);
debug!("Connecting to executor {:?}", executor_url);
let _ = protobuf::executor_grpc_client::ExecutorGrpcClient::connect(executor_url)
.await
.map_err(|e| {
BallistaError::Internal(format!(
"Failed to register executor at {}:{}, could not connect: {:?}",
metadata.host, metadata.grpc_port, e
))
})?;
Ok(())
}
#[cfg(test)]
async fn test_scheduler_connectivity(
&self,
_metadata: &ExecutorMetadata,
) -> Result<()> {
Ok(())
}
pub(crate) async fn save_executor_heartbeat(
&self,
heartbeat: protobuf::ExecutorHeartbeat,
) -> Result<()> {
let executor_id = heartbeat.executor_id.clone();
let value = encode_protobuf(&heartbeat)?;
self.state
.put(Keyspace::Heartbeats, executor_id, value)
.await?;
self.executors_heartbeat
.insert(heartbeat.executor_id.clone(), heartbeat);
Ok(())
}
pub(crate) async fn save_dead_executor_heartbeat(
&self,
heartbeat: protobuf::ExecutorHeartbeat,
) -> Result<()> {
let executor_id = heartbeat.executor_id.clone();
let value = encode_protobuf(&heartbeat)?;
self.state
.put(Keyspace::Heartbeats, executor_id.clone(), value)
.await?;
self.executors_heartbeat
.remove(&heartbeat.executor_id.clone());
{
let mut executor_data = self.executor_data.lock();
executor_data.remove(&executor_id);
}
self.dead_executors.insert(executor_id);
Ok(())
}
pub(crate) fn is_dead_executor(&self, executor_id: &str) -> bool {
self.dead_executors.contains(executor_id)
}
async fn init_active_executor_heartbeats(&self) -> Result<()> {
let heartbeats = self.state.scan(Keyspace::Heartbeats, None).await?;
for (_, value) in heartbeats {
let data: protobuf::ExecutorHeartbeat = decode_protobuf(&value)?;
let executor_id = data.executor_id.clone();
if let Some(ExecutorStatus {
status: Some(executor_status::Status::Active(_)),
}) = data.status
{
self.executors_heartbeat.insert(executor_id, data);
}
}
Ok(())
}
pub(crate) fn get_alive_executors(
&self,
last_seen_ts_threshold: u64,
) -> HashSet<String> {
self.executors_heartbeat
.iter()
.filter_map(|pair| {
let (exec, heartbeat) = pair.pair();
(heartbeat.timestamp > last_seen_ts_threshold).then(|| exec.clone())
})
.collect()
}
pub(crate) fn get_expired_executors(&self) -> Vec<ExecutorHeartbeat> {
let now_epoch_ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards");
let last_seen_threshold = now_epoch_ts
.checked_sub(Duration::from_secs(DEFAULT_EXECUTOR_TIMEOUT_SECONDS))
.unwrap_or_else(|| Duration::from_secs(0))
.as_secs();
let expired_executors = self
.executors_heartbeat
.iter()
.filter_map(|pair| {
let (_exec, heartbeat) = pair.pair();
(heartbeat.timestamp <= last_seen_threshold).then(|| heartbeat.clone())
})
.collect::<Vec<_>>();
expired_executors
}
pub(crate) fn get_alive_executors_within_one_minute(&self) -> HashSet<String> {
let now_epoch_ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards");
let last_seen_threshold = now_epoch_ts
.checked_sub(Duration::from_secs(60))
.unwrap_or_else(|| Duration::from_secs(0));
self.get_alive_executors(last_seen_threshold.as_secs())
}
}
struct ExecutorHeartbeatListener {
state: Arc<dyn StateBackendClient>,
executors_heartbeat: Arc<DashMap<String, protobuf::ExecutorHeartbeat>>,
dead_executors: Arc<DashSet<String>>,
}
impl ExecutorHeartbeatListener {
pub fn new(
state: Arc<dyn StateBackendClient>,
executors_heartbeat: Arc<DashMap<String, protobuf::ExecutorHeartbeat>>,
dead_executors: Arc<DashSet<String>>,
) -> Self {
Self {
state,
executors_heartbeat,
dead_executors,
}
}
pub async fn start(&self) -> Result<()> {
let mut watch = self
.state
.watch(Keyspace::Heartbeats, "".to_owned())
.await?;
let heartbeats = self.executors_heartbeat.clone();
let dead_executors = self.dead_executors.clone();
tokio::task::spawn(async move {
while let Some(event) = watch.next().await {
if let WatchEvent::Put(_, value) = event {
if let Ok(data) =
decode_protobuf::<protobuf::ExecutorHeartbeat>(&value)
{
let executor_id = data.executor_id.clone();
if let Some(ExecutorStatus {
status: Some(executor_status::Status::Dead(_)),
}) = data.status
{
heartbeats.remove(&executor_id);
dead_executors.insert(executor_id);
} else {
heartbeats.insert(executor_id, data);
}
}
}
}
});
Ok(())
}
}
#[cfg(test)]
mod test {
use crate::config::SlotsPolicy;
use crate::state::backend::standalone::StandaloneClient;
use crate::state::executor_manager::{ExecutorManager, ExecutorReservation};
use ballista_core::error::Result;
use ballista_core::serde::scheduler::{
ExecutorData, ExecutorMetadata, ExecutorSpecification,
};
use std::sync::Arc;
#[tokio::test]
async fn test_reserve_and_cancel() -> Result<()> {
test_reserve_and_cancel_inner(SlotsPolicy::Bias).await?;
test_reserve_and_cancel_inner(SlotsPolicy::RoundRobin).await?;
test_reserve_and_cancel_inner(SlotsPolicy::RoundRobinLocal).await?;
Ok(())
}
async fn test_reserve_and_cancel_inner(slots_policy: SlotsPolicy) -> Result<()> {
let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
let executor_manager = ExecutorManager::new(state_storage, slots_policy);
let executors = test_executors(10, 4);
for (executor_metadata, executor_data) in executors {
executor_manager
.register_executor(executor_metadata, executor_data, false)
.await?;
}
let reservations = executor_manager.reserve_slots(40).await?;
assert_eq!(reservations.len(), 40);
executor_manager.cancel_reservations(reservations).await?;
let reservations = executor_manager.reserve_slots(40).await?;
assert_eq!(reservations.len(), 40);
Ok(())
}
#[tokio::test]
async fn test_reserve_partial() -> Result<()> {
test_reserve_partial_inner(SlotsPolicy::Bias).await?;
test_reserve_partial_inner(SlotsPolicy::RoundRobin).await?;
test_reserve_partial_inner(SlotsPolicy::RoundRobinLocal).await?;
Ok(())
}
async fn test_reserve_partial_inner(slots_policy: SlotsPolicy) -> Result<()> {
let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
let executor_manager = ExecutorManager::new(state_storage, slots_policy);
let executors = test_executors(10, 4);
for (executor_metadata, executor_data) in executors {
executor_manager
.register_executor(executor_metadata, executor_data, false)
.await?;
}
let reservations = executor_manager.reserve_slots(30).await?;
assert_eq!(reservations.len(), 30);
let more_reservations = executor_manager.reserve_slots(30).await?;
assert_eq!(more_reservations.len(), 10);
executor_manager.cancel_reservations(reservations).await?;
executor_manager
.cancel_reservations(more_reservations)
.await?;
let reservations = executor_manager.reserve_slots(40).await?;
assert_eq!(reservations.len(), 40);
let more_reservations = executor_manager.reserve_slots(30).await?;
assert_eq!(more_reservations.len(), 0);
Ok(())
}
#[tokio::test]
async fn test_reserve_concurrent() -> Result<()> {
test_reserve_concurrent_inner(SlotsPolicy::Bias).await?;
test_reserve_concurrent_inner(SlotsPolicy::RoundRobin).await?;
test_reserve_concurrent_inner(SlotsPolicy::RoundRobinLocal).await?;
Ok(())
}
async fn test_reserve_concurrent_inner(slots_policy: SlotsPolicy) -> Result<()> {
let (sender, mut receiver) =
tokio::sync::mpsc::channel::<Result<Vec<ExecutorReservation>>>(1000);
let executors = test_executors(10, 4);
let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
let executor_manager = ExecutorManager::new(state_storage, slots_policy);
for (executor_metadata, executor_data) in executors {
executor_manager
.register_executor(executor_metadata, executor_data, false)
.await?;
}
{
let sender = sender;
for _ in 0..20 {
let executor_manager = executor_manager.clone();
let sender = sender.clone();
tokio::task::spawn(async move {
let reservations = executor_manager.reserve_slots(40).await;
sender.send(reservations).await.unwrap();
});
}
}
let mut total_reservations: Vec<ExecutorReservation> = vec![];
while let Some(Ok(reservations)) = receiver.recv().await {
total_reservations.extend(reservations);
}
assert_eq!(total_reservations.len(), 40);
Ok(())
}
#[tokio::test]
async fn test_register_reserve() -> Result<()> {
test_register_reserve_inner(SlotsPolicy::Bias).await?;
test_register_reserve_inner(SlotsPolicy::RoundRobin).await?;
test_register_reserve_inner(SlotsPolicy::RoundRobinLocal).await?;
Ok(())
}
async fn test_register_reserve_inner(slots_policy: SlotsPolicy) -> Result<()> {
let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
let executor_manager = ExecutorManager::new(state_storage, slots_policy);
let executors = test_executors(10, 4);
for (executor_metadata, executor_data) in executors {
let reservations = executor_manager
.register_executor(executor_metadata, executor_data, true)
.await?;
assert_eq!(reservations.len(), 4);
}
let reservations = executor_manager.reserve_slots(1).await?;
assert_eq!(reservations.len(), 0);
Ok(())
}
fn test_executors(
total_executors: usize,
slots_per_executor: u32,
) -> Vec<(ExecutorMetadata, ExecutorData)> {
let mut result: Vec<(ExecutorMetadata, ExecutorData)> = vec![];
for i in 0..total_executors {
result.push((
ExecutorMetadata {
id: format!("executor-{}", i),
host: format!("host-{}", i),
port: 8080,
grpc_port: 9090,
specification: ExecutorSpecification {
task_slots: slots_per_executor,
},
},
ExecutorData {
executor_id: format!("executor-{}", i),
total_task_slots: slots_per_executor,
available_task_slots: slots_per_executor,
},
));
}
result
}
}