Skip to main content

inference_runtime/
deployment_manager.rs

1//! `DeploymentManagerActor` — cluster-singleton owner of the deployment
2//! catalog. Doc §4. Manages create/update/delete and surfaces the
3//! current set to the gateway and `DpCoordinatorActor`.
4
5use std::collections::HashMap;
6
7use async_trait::async_trait;
8use rakka_core::actor::{Actor, Context};
9use tokio::sync::oneshot;
10
11use inference_core::deployment::Deployment;
12use inference_core::error::InferenceError;
13
14#[derive(Debug, Clone)]
15pub struct DeploymentRecord {
16    pub deployment: Deployment,
17    pub state: DeploymentState,
18}
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum DeploymentState {
22    Pending,
23    Serving,
24    Draining,
25    Failed,
26}
27
28pub enum DeploymentManagerMsg {
29    Apply {
30        deployment: Deployment,
31        reply: oneshot::Sender<Result<(), InferenceError>>,
32    },
33    Remove {
34        name: String,
35        reply: oneshot::Sender<Result<(), InferenceError>>,
36    },
37    List {
38        reply: oneshot::Sender<Vec<DeploymentRecord>>,
39    },
40    Get {
41        name: String,
42        reply: oneshot::Sender<Option<DeploymentRecord>>,
43    },
44}
45
46#[derive(Default)]
47pub struct DeploymentManagerActor {
48    records: HashMap<String, DeploymentRecord>,
49}
50
51impl DeploymentManagerActor {
52    pub fn new() -> Self {
53        Self::default()
54    }
55}
56
57#[async_trait]
58impl Actor for DeploymentManagerActor {
59    type Msg = DeploymentManagerMsg;
60
61    async fn handle(&mut self, _ctx: &mut Context<Self>, msg: Self::Msg) {
62        match msg {
63            DeploymentManagerMsg::Apply { deployment, reply } => {
64                let res = match deployment.validate() {
65                    Ok(()) => {
66                        let name = deployment.name.clone();
67                        self.records.insert(
68                            name,
69                            DeploymentRecord {
70                                deployment,
71                                state: DeploymentState::Pending,
72                            },
73                        );
74                        Ok(())
75                    }
76                    Err(e) => Err(InferenceError::Internal(e.to_string())),
77                };
78                let _ = reply.send(res);
79            }
80            DeploymentManagerMsg::Remove { name, reply } => {
81                self.records.remove(&name);
82                let _ = reply.send(Ok(()));
83            }
84            DeploymentManagerMsg::List { reply } => {
85                let _ = reply.send(self.records.values().cloned().collect());
86            }
87            DeploymentManagerMsg::Get { name, reply } => {
88                let _ = reply.send(self.records.get(&name).cloned());
89            }
90        }
91    }
92}