atomr_infer_runtime/
placement.rs1use async_trait::async_trait;
21use atomr_core::actor::{Actor, Context};
22use tokio::sync::oneshot;
23
24use atomr_infer_core::deployment::Deployment;
25use atomr_infer_core::runtime::{ProviderKind, TransportKind};
26
27#[derive(Debug, Clone)]
28pub struct PlacementConstraints {
29 pub gpu_nodes: Vec<String>,
31 pub egress_nodes: Vec<String>,
33 pub egress_preference: std::collections::HashMap<(ProviderKind, String), Vec<String>>,
36}
37
38#[derive(Debug, Clone)]
39pub struct PlacementResult {
40 pub deployment: String,
41 pub assignments: Vec<NodeAssignment>,
42}
43
44#[derive(Debug, Clone)]
45pub struct NodeAssignment {
46 pub replica_index: u32,
47 pub node: String,
48 pub gpus: Vec<u32>,
49}
50
51#[derive(Debug, thiserror::Error)]
52#[non_exhaustive]
53pub enum PlacementError {
54 #[error("no GPU nodes available")]
55 NoGpuNodes,
56 #[error("no egress nodes available for provider {0:?}")]
57 NoEgressNodes(ProviderKind),
58 #[error("deployment requires {requested} GPUs but no node has that many free")]
59 NotEnoughGpus { requested: u32 },
60 #[error("unknown TransportKind variant — atomr-infer was built against a newer atomr-infer-core")]
61 UnknownTransport,
62}
63
64pub enum PlacementMsg {
65 Place {
66 deployment: Deployment,
67 constraints: PlacementConstraints,
68 reply: oneshot::Sender<Result<PlacementResult, PlacementError>>,
69 },
70}
71
72pub struct DeploymentPlacementActor;
73
74impl Default for DeploymentPlacementActor {
75 fn default() -> Self {
76 Self
77 }
78}
79
80impl DeploymentPlacementActor {
81 pub fn new() -> Self {
82 Self
83 }
84
85 fn place(
86 &self,
87 deployment: &Deployment,
88 constraints: &PlacementConstraints,
89 ) -> Result<PlacementResult, PlacementError> {
90 let kind = deployment.effective_runtime();
91 let transport: TransportKind = (&kind).into();
92 let mut assignments = Vec::with_capacity(deployment.replicas as usize);
93 match transport {
94 TransportKind::LocalGpu => {
95 if constraints.gpu_nodes.is_empty() {
96 return Err(PlacementError::NoGpuNodes);
97 }
98 let want = deployment.gpus.unwrap_or(1);
99 for i in 0..deployment.replicas {
100 let node = constraints.gpu_nodes[(i as usize) % constraints.gpu_nodes.len()].clone();
101 let gpus = (0..want).collect();
108 assignments.push(NodeAssignment {
109 replica_index: i,
110 node,
111 gpus,
112 });
113 }
114 }
115 TransportKind::RemoteNetwork { provider } => {
116 if constraints.egress_nodes.is_empty() {
117 return Err(PlacementError::NoEgressNodes(provider));
118 }
119 for i in 0..deployment.replicas {
120 let node =
121 constraints.egress_nodes[(i as usize) % constraints.egress_nodes.len()].clone();
122 assignments.push(NodeAssignment {
123 replica_index: i,
124 node,
125 gpus: vec![],
126 });
127 }
128 }
129 _ => return Err(PlacementError::UnknownTransport),
133 }
134 Ok(PlacementResult {
135 deployment: deployment.name.clone(),
136 assignments,
137 })
138 }
139}
140
141#[async_trait]
142impl Actor for DeploymentPlacementActor {
143 type Msg = PlacementMsg;
144
145 async fn handle(&mut self, _ctx: &mut Context<Self>, msg: Self::Msg) {
146 match msg {
147 PlacementMsg::Place {
148 deployment,
149 constraints,
150 reply,
151 } => {
152 let _ = reply.send(self.place(&deployment, &constraints));
153 }
154 }
155 }
156}