Skip to main content

kratazone/
background.rs

1use crate::{
2    childwait::{ChildEvent, ChildWait},
3    death,
4    exec::ZoneExecTask,
5    metrics::MetricsCollector,
6};
7use anyhow::Result;
8use cgroups_rs::Cgroup;
9use krata::idm::{
10    client::{IdmClientStreamResponseHandle, IdmInternalClient},
11    internal::{
12        event::Event as EventType, request::Request as RequestType,
13        response::Response as ResponseType, Event, ExecStreamResponseUpdate, ExitEvent,
14        MetricsResponse, PingResponse, Request, Response,
15    },
16};
17use log::debug;
18use nix::unistd::Pid;
19use tokio::sync::broadcast::Receiver;
20use tokio::{select, sync::broadcast};
21
22pub struct ZoneBackground {
23    idm: IdmInternalClient,
24    child: Pid,
25    _cgroup: Cgroup,
26    wait: ChildWait,
27    child_receiver: Receiver<ChildEvent>,
28}
29
30impl ZoneBackground {
31    pub async fn new(idm: IdmInternalClient, cgroup: Cgroup, child: Pid) -> Result<ZoneBackground> {
32        let (wait, child_receiver) = ChildWait::new()?;
33        Ok(ZoneBackground {
34            idm,
35            child,
36            _cgroup: cgroup,
37            wait,
38            child_receiver,
39        })
40    }
41
42    pub async fn run(&mut self) -> Result<()> {
43        let mut event_subscription = self.idm.subscribe().await?;
44        let mut requests_subscription = self.idm.requests().await?;
45        let mut request_streams_subscription = self.idm.request_streams().await?;
46        loop {
47            select! {
48                x = event_subscription.recv() => match x {
49                    Ok(_event) => {
50                    },
51
52                    Err(broadcast::error::RecvError::Closed) => {
53                        debug!("idm packet channel closed");
54                        break;
55                    },
56
57                    _ => {
58                        continue;
59                    }
60                },
61
62                x = requests_subscription.recv() => match x {
63                    Ok((id, request)) => {
64                        self.handle_idm_request(id, request).await?;
65                    },
66
67                    Err(broadcast::error::RecvError::Closed) => {
68                        debug!("idm packet channel closed");
69                        break;
70                    },
71
72                    _ => {
73                        continue;
74                    }
75                },
76
77                x = request_streams_subscription.recv() => match x {
78                    Ok(handle) => {
79                        self.handle_idm_stream_request(handle).await?;
80                    },
81
82                    Err(broadcast::error::RecvError::Closed) => {
83                        debug!("idm packet channel closed");
84                        break;
85                    },
86
87                    _ => {
88                        continue;
89                    }
90                },
91
92                event = self.child_receiver.recv() => match event {
93                    Ok(event) => self.child_event(event).await?,
94                    Err(_) => {
95                        break;
96                    }
97                }
98            }
99        }
100        Ok(())
101    }
102
103    async fn handle_idm_request(&mut self, id: u64, packet: Request) -> Result<()> {
104        match packet.request {
105            Some(RequestType::Ping(_)) => {
106                self.idm
107                    .respond(
108                        id,
109                        Response {
110                            response: Some(ResponseType::Ping(PingResponse {})),
111                        },
112                    )
113                    .await?;
114            }
115
116            Some(RequestType::Metrics(_)) => {
117                let metrics = MetricsCollector::new()?;
118                let root = metrics.collect()?;
119                let response = Response {
120                    response: Some(ResponseType::Metrics(MetricsResponse { root: Some(root) })),
121                };
122
123                self.idm.respond(id, response).await?;
124            }
125
126            _ => {}
127        }
128        Ok(())
129    }
130
131    async fn handle_idm_stream_request(
132        &mut self,
133        handle: IdmClientStreamResponseHandle<Request>,
134    ) -> Result<()> {
135        let wait = self.wait.clone();
136        if let Some(RequestType::ExecStream(_)) = &handle.initial.request {
137            tokio::task::spawn(async move {
138                let exec = ZoneExecTask { wait, handle };
139                if let Err(error) = exec.run().await {
140                    let _ = exec
141                        .handle
142                        .respond(Response {
143                            response: Some(ResponseType::ExecStream(ExecStreamResponseUpdate {
144                                exited: true,
145                                error: error.to_string(),
146                                exit_code: -1,
147                                stdout: vec![],
148                                stderr: vec![],
149                            })),
150                        })
151                        .await;
152                }
153            });
154        }
155        Ok(())
156    }
157
158    async fn child_event(&mut self, event: ChildEvent) -> Result<()> {
159        if event.pid == self.child {
160            self.idm
161                .emit(Event {
162                    event: Some(EventType::Exit(ExitEvent { code: event.status })),
163                })
164                .await?;
165            death(event.status).await?;
166        }
167        Ok(())
168    }
169}