inference_runtime/
placement.rs1use async_trait::async_trait;
21use rakka_core::actor::{Actor, Context};
22use tokio::sync::oneshot;
23
24use inference_core::deployment::Deployment;
25use inference_core::runtime::{ProviderKind, TransportKind};
26
27#[derive(Debug, Clone)]
28pub struct PlacementConstraints {
29 pub gpu_nodes: Vec<String>,
31 pub egress_nodes: Vec<String>,
33 pub egress_preference: std::collections::HashMap<(ProviderKind, String), Vec<String>>,
36}
37
38#[derive(Debug, Clone)]
39pub struct PlacementResult {
40 pub deployment: String,
41 pub assignments: Vec<NodeAssignment>,
42}
43
44#[derive(Debug, Clone)]
45pub struct NodeAssignment {
46 pub replica_index: u32,
47 pub node: String,
48 pub gpus: Vec<u32>,
49}
50
51#[derive(Debug, thiserror::Error)]
52pub enum PlacementError {
53 #[error("no GPU nodes available")]
54 NoGpuNodes,
55 #[error("no egress nodes available for provider {0:?}")]
56 NoEgressNodes(ProviderKind),
57 #[error("deployment requires {requested} GPUs but no node has that many free")]
58 NotEnoughGpus { requested: u32 },
59}
60
61pub enum PlacementMsg {
62 Place {
63 deployment: Deployment,
64 constraints: PlacementConstraints,
65 reply: oneshot::Sender<Result<PlacementResult, PlacementError>>,
66 },
67}
68
69pub struct DeploymentPlacementActor;
70
71impl Default for DeploymentPlacementActor {
72 fn default() -> Self {
73 Self
74 }
75}
76
77impl DeploymentPlacementActor {
78 pub fn new() -> Self {
79 Self
80 }
81
82 fn place(
83 &self,
84 deployment: &Deployment,
85 constraints: &PlacementConstraints,
86 ) -> Result<PlacementResult, PlacementError> {
87 let kind = deployment.effective_runtime();
88 let transport: TransportKind = (&kind).into();
89 let mut assignments = Vec::with_capacity(deployment.replicas as usize);
90 match transport {
91 TransportKind::LocalGpu => {
92 if constraints.gpu_nodes.is_empty() {
93 return Err(PlacementError::NoGpuNodes);
94 }
95 let want = deployment.gpus.unwrap_or(1);
96 for i in 0..deployment.replicas {
97 let node = constraints.gpu_nodes[(i as usize) % constraints.gpu_nodes.len()].clone();
98 let gpus = (0..want).collect();
105 assignments.push(NodeAssignment {
106 replica_index: i,
107 node,
108 gpus,
109 });
110 }
111 }
112 TransportKind::RemoteNetwork { provider } => {
113 if constraints.egress_nodes.is_empty() {
114 return Err(PlacementError::NoEgressNodes(provider));
115 }
116 for i in 0..deployment.replicas {
117 let node =
118 constraints.egress_nodes[(i as usize) % constraints.egress_nodes.len()].clone();
119 assignments.push(NodeAssignment {
120 replica_index: i,
121 node,
122 gpus: vec![],
123 });
124 }
125 }
126 }
127 Ok(PlacementResult {
128 deployment: deployment.name.clone(),
129 assignments,
130 })
131 }
132}
133
134#[async_trait]
135impl Actor for DeploymentPlacementActor {
136 type Msg = PlacementMsg;
137
138 async fn handle(&mut self, _ctx: &mut Context<Self>, msg: Self::Msg) {
139 match msg {
140 PlacementMsg::Place {
141 deployment,
142 constraints,
143 reply,
144 } => {
145 let _ = reply.send(self.place(&deployment, &constraints));
146 }
147 }
148 }
149}