1use crate::{
2 childwait::{ChildEvent, ChildWait},
3 death,
4 exec::ZoneExecTask,
5 metrics::MetricsCollector,
6};
7use anyhow::Result;
8use cgroups_rs::Cgroup;
9use krata::idm::{
10 client::{IdmClientStreamResponseHandle, IdmInternalClient},
11 internal::{
12 event::Event as EventType, request::Request as RequestType,
13 response::Response as ResponseType, Event, ExecStreamResponseUpdate, ExitEvent,
14 MetricsResponse, PingResponse, Request, Response,
15 },
16};
17use log::debug;
18use nix::unistd::Pid;
19use tokio::sync::broadcast::Receiver;
20use tokio::{select, sync::broadcast};
21
22pub struct ZoneBackground {
23 idm: IdmInternalClient,
24 child: Pid,
25 _cgroup: Cgroup,
26 wait: ChildWait,
27 child_receiver: Receiver<ChildEvent>,
28}
29
30impl ZoneBackground {
31 pub async fn new(idm: IdmInternalClient, cgroup: Cgroup, child: Pid) -> Result<ZoneBackground> {
32 let (wait, child_receiver) = ChildWait::new()?;
33 Ok(ZoneBackground {
34 idm,
35 child,
36 _cgroup: cgroup,
37 wait,
38 child_receiver,
39 })
40 }
41
42 pub async fn run(&mut self) -> Result<()> {
43 let mut event_subscription = self.idm.subscribe().await?;
44 let mut requests_subscription = self.idm.requests().await?;
45 let mut request_streams_subscription = self.idm.request_streams().await?;
46 loop {
47 select! {
48 x = event_subscription.recv() => match x {
49 Ok(_event) => {
50 },
51
52 Err(broadcast::error::RecvError::Closed) => {
53 debug!("idm packet channel closed");
54 break;
55 },
56
57 _ => {
58 continue;
59 }
60 },
61
62 x = requests_subscription.recv() => match x {
63 Ok((id, request)) => {
64 self.handle_idm_request(id, request).await?;
65 },
66
67 Err(broadcast::error::RecvError::Closed) => {
68 debug!("idm packet channel closed");
69 break;
70 },
71
72 _ => {
73 continue;
74 }
75 },
76
77 x = request_streams_subscription.recv() => match x {
78 Ok(handle) => {
79 self.handle_idm_stream_request(handle).await?;
80 },
81
82 Err(broadcast::error::RecvError::Closed) => {
83 debug!("idm packet channel closed");
84 break;
85 },
86
87 _ => {
88 continue;
89 }
90 },
91
92 event = self.child_receiver.recv() => match event {
93 Ok(event) => self.child_event(event).await?,
94 Err(_) => {
95 break;
96 }
97 }
98 }
99 }
100 Ok(())
101 }
102
103 async fn handle_idm_request(&mut self, id: u64, packet: Request) -> Result<()> {
104 match packet.request {
105 Some(RequestType::Ping(_)) => {
106 self.idm
107 .respond(
108 id,
109 Response {
110 response: Some(ResponseType::Ping(PingResponse {})),
111 },
112 )
113 .await?;
114 }
115
116 Some(RequestType::Metrics(_)) => {
117 let metrics = MetricsCollector::new()?;
118 let root = metrics.collect()?;
119 let response = Response {
120 response: Some(ResponseType::Metrics(MetricsResponse { root: Some(root) })),
121 };
122
123 self.idm.respond(id, response).await?;
124 }
125
126 _ => {}
127 }
128 Ok(())
129 }
130
131 async fn handle_idm_stream_request(
132 &mut self,
133 handle: IdmClientStreamResponseHandle<Request>,
134 ) -> Result<()> {
135 let wait = self.wait.clone();
136 if let Some(RequestType::ExecStream(_)) = &handle.initial.request {
137 tokio::task::spawn(async move {
138 let exec = ZoneExecTask { wait, handle };
139 if let Err(error) = exec.run().await {
140 let _ = exec
141 .handle
142 .respond(Response {
143 response: Some(ResponseType::ExecStream(ExecStreamResponseUpdate {
144 exited: true,
145 error: error.to_string(),
146 exit_code: -1,
147 stdout: vec![],
148 stderr: vec![],
149 })),
150 })
151 .await;
152 }
153 });
154 }
155 Ok(())
156 }
157
158 async fn child_event(&mut self, event: ChildEvent) -> Result<()> {
159 if event.pid == self.child {
160 self.idm
161 .emit(Event {
162 event: Some(EventType::Exit(ExitEvent { code: event.status })),
163 })
164 .await?;
165 death(event.status).await?;
166 }
167 Ok(())
168 }
169}