use async_trait::async_trait;
use rakka_core::actor::{Actor, Context};
use tokio::sync::oneshot;
use inference_core::deployment::Deployment;
use inference_core::runtime::{ProviderKind, TransportKind};
#[derive(Debug, Clone)]
pub struct PlacementConstraints {
pub gpu_nodes: Vec<String>,
pub egress_nodes: Vec<String>,
pub egress_preference: std::collections::HashMap<(ProviderKind, String), Vec<String>>,
}
#[derive(Debug, Clone)]
pub struct PlacementResult {
pub deployment: String,
pub assignments: Vec<NodeAssignment>,
}
#[derive(Debug, Clone)]
pub struct NodeAssignment {
pub replica_index: u32,
pub node: String,
pub gpus: Vec<u32>,
}
#[derive(Debug, thiserror::Error)]
pub enum PlacementError {
#[error("no GPU nodes available")]
NoGpuNodes,
#[error("no egress nodes available for provider {0:?}")]
NoEgressNodes(ProviderKind),
#[error("deployment requires {requested} GPUs but no node has that many free")]
NotEnoughGpus { requested: u32 },
}
pub enum PlacementMsg {
Place {
deployment: Deployment,
constraints: PlacementConstraints,
reply: oneshot::Sender<Result<PlacementResult, PlacementError>>,
},
}
pub struct DeploymentPlacementActor;
impl Default for DeploymentPlacementActor {
fn default() -> Self {
Self
}
}
impl DeploymentPlacementActor {
pub fn new() -> Self {
Self
}
fn place(
&self,
deployment: &Deployment,
constraints: &PlacementConstraints,
) -> Result<PlacementResult, PlacementError> {
let kind = deployment.effective_runtime();
let transport: TransportKind = (&kind).into();
let mut assignments = Vec::with_capacity(deployment.replicas as usize);
match transport {
TransportKind::LocalGpu => {
if constraints.gpu_nodes.is_empty() {
return Err(PlacementError::NoGpuNodes);
}
let want = deployment.gpus.unwrap_or(1);
for i in 0..deployment.replicas {
let node = constraints.gpu_nodes[(i as usize) % constraints.gpu_nodes.len()].clone();
let gpus = (0..want).collect();
assignments.push(NodeAssignment {
replica_index: i,
node,
gpus,
});
}
}
TransportKind::RemoteNetwork { provider } => {
if constraints.egress_nodes.is_empty() {
return Err(PlacementError::NoEgressNodes(provider));
}
for i in 0..deployment.replicas {
let node =
constraints.egress_nodes[(i as usize) % constraints.egress_nodes.len()].clone();
assignments.push(NodeAssignment {
replica_index: i,
node,
gpus: vec![],
});
}
}
}
Ok(PlacementResult {
deployment: deployment.name.clone(),
assignments,
})
}
}
#[async_trait]
impl Actor for DeploymentPlacementActor {
type Msg = PlacementMsg;
async fn handle(&mut self, _ctx: &mut Context<Self>, msg: Self::Msg) {
match msg {
PlacementMsg::Place {
deployment,
constraints,
reply,
} => {
let _ = reply.send(self.place(&deployment, &constraints));
}
}
}
}