inference_runtime/
deployment_manager.rs1use std::collections::HashMap;
6
7use async_trait::async_trait;
8use rakka_core::actor::{Actor, Context};
9use tokio::sync::oneshot;
10
11use inference_core::deployment::Deployment;
12use inference_core::error::InferenceError;
13
14#[derive(Debug, Clone)]
15pub struct DeploymentRecord {
16 pub deployment: Deployment,
17 pub state: DeploymentState,
18}
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum DeploymentState {
22 Pending,
23 Serving,
24 Draining,
25 Failed,
26}
27
28pub enum DeploymentManagerMsg {
29 Apply {
30 deployment: Deployment,
31 reply: oneshot::Sender<Result<(), InferenceError>>,
32 },
33 Remove {
34 name: String,
35 reply: oneshot::Sender<Result<(), InferenceError>>,
36 },
37 List {
38 reply: oneshot::Sender<Vec<DeploymentRecord>>,
39 },
40 Get {
41 name: String,
42 reply: oneshot::Sender<Option<DeploymentRecord>>,
43 },
44}
45
46#[derive(Default)]
47pub struct DeploymentManagerActor {
48 records: HashMap<String, DeploymentRecord>,
49}
50
51impl DeploymentManagerActor {
52 pub fn new() -> Self {
53 Self::default()
54 }
55}
56
57#[async_trait]
58impl Actor for DeploymentManagerActor {
59 type Msg = DeploymentManagerMsg;
60
61 async fn handle(&mut self, _ctx: &mut Context<Self>, msg: Self::Msg) {
62 match msg {
63 DeploymentManagerMsg::Apply { deployment, reply } => {
64 let res = match deployment.validate() {
65 Ok(()) => {
66 let name = deployment.name.clone();
67 self.records.insert(
68 name,
69 DeploymentRecord {
70 deployment,
71 state: DeploymentState::Pending,
72 },
73 );
74 Ok(())
75 }
76 Err(e) => Err(InferenceError::Internal(e.to_string())),
77 };
78 let _ = reply.send(res);
79 }
80 DeploymentManagerMsg::Remove { name, reply } => {
81 self.records.remove(&name);
82 let _ = reply.send(Ok(()));
83 }
84 DeploymentManagerMsg::List { reply } => {
85 let _ = reply.send(self.records.values().cloned().collect());
86 }
87 DeploymentManagerMsg::Get { name, reply } => {
88 let _ = reply.send(self.records.get(&name).cloned());
89 }
90 }
91 }
92}