Skip to main content

atomr_infer_runtime/
deployment_manager.rs

1//! `DeploymentManagerActor` — cluster-singleton owner of the deployment
2//! catalog. Doc §4. Manages create/update/delete and surfaces the
3//! current set to the gateway and `DpCoordinatorActor`.
4
5use std::collections::HashMap;
6
7use async_trait::async_trait;
8use atomr_core::actor::{Actor, Context};
9use tokio::sync::oneshot;
10
11use atomr_infer_core::deployment::Deployment;
12use atomr_infer_core::error::InferenceError;
13
14#[derive(Debug, Clone)]
15pub struct DeploymentRecord {
16    pub deployment: Deployment,
17    pub state: DeploymentState,
18}
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum DeploymentState {
22    Pending,
23    Serving,
24    Draining,
25    Failed,
26}
27
28// `Apply` carries a full `Deployment` (~248B) while the other
29// variants are name-string-shaped. Boxing would force every caller
30// to wrap, which is more friction than the size penalty is worth for
31// a short-lived mailbox message.
32#[allow(clippy::large_enum_variant)]
33pub enum DeploymentManagerMsg {
34    Apply {
35        deployment: Deployment,
36        reply: oneshot::Sender<Result<(), InferenceError>>,
37    },
38    Remove {
39        name: String,
40        reply: oneshot::Sender<Result<(), InferenceError>>,
41    },
42    List {
43        reply: oneshot::Sender<Vec<DeploymentRecord>>,
44    },
45    Get {
46        name: String,
47        reply: oneshot::Sender<Option<DeploymentRecord>>,
48    },
49}
50
51#[derive(Default)]
52pub struct DeploymentManagerActor {
53    records: HashMap<String, DeploymentRecord>,
54}
55
56impl DeploymentManagerActor {
57    pub fn new() -> Self {
58        Self::default()
59    }
60}
61
62#[async_trait]
63impl Actor for DeploymentManagerActor {
64    type Msg = DeploymentManagerMsg;
65
66    async fn handle(&mut self, _ctx: &mut Context<Self>, msg: Self::Msg) {
67        match msg {
68            DeploymentManagerMsg::Apply { deployment, reply } => {
69                let res = match deployment.validate() {
70                    Ok(()) => {
71                        let name = deployment.name.clone();
72                        self.records.insert(
73                            name,
74                            DeploymentRecord {
75                                deployment,
76                                state: DeploymentState::Pending,
77                            },
78                        );
79                        Ok(())
80                    }
81                    Err(e) => Err(InferenceError::Internal(e.to_string())),
82                };
83                let _ = reply.send(res);
84            }
85            DeploymentManagerMsg::Remove { name, reply } => {
86                self.records.remove(&name);
87                let _ = reply.send(Ok(()));
88            }
89            DeploymentManagerMsg::List { reply } => {
90                let _ = reply.send(self.records.values().cloned().collect());
91            }
92            DeploymentManagerMsg::Get { name, reply } => {
93                let _ = reply.send(self.records.get(&name).cloned());
94            }
95        }
96    }
97}