1use std::pin::Pin;
2
3use anyhow::Error;
4use futures::Stream;
5use list_network_reservations::ListNetworkReservationsRpc;
6use tokio::sync::mpsc::Sender;
7use tonic::{Request, Response, Status, Streaming};
8use uuid::Uuid;
9
10use krata::v1::control::{
11 control_service_server::ControlService, CreateZoneReply, CreateZoneRequest, DestroyZoneReply,
12 DestroyZoneRequest, ExecInsideZoneReply, ExecInsideZoneRequest, GetHostCpuTopologyReply,
13 GetHostCpuTopologyRequest, GetHostStatusReply, GetHostStatusRequest, ListDevicesReply,
14 ListDevicesRequest, ListZonesReply, ListZonesRequest, PullImageReply, PullImageRequest,
15 ReadHypervisorConsoleReply, ReadHypervisorConsoleRequest, ReadZoneMetricsReply,
16 ReadZoneMetricsRequest, ResolveZoneIdReply, ResolveZoneIdRequest, SnoopIdmReply,
17 SnoopIdmRequest, UpdateZoneResourcesReply, UpdateZoneResourcesRequest, WatchEventsReply,
18 WatchEventsRequest, ZoneConsoleReply, ZoneConsoleRequest,
19};
20use krata::v1::control::{
21 GetZoneReply, GetZoneRequest, ListNetworkReservationsReply, ListNetworkReservationsRequest,
22 SetHostPowerManagementPolicyReply, SetHostPowerManagementPolicyRequest,
23};
24use krataoci::packer::service::OciPackerService;
25use kratart::Runtime;
26
27use crate::control::attach_zone_console::AttachZoneConsoleRpc;
28use crate::control::create_zone::CreateZoneRpc;
29use crate::control::destroy_zone::DestroyZoneRpc;
30use crate::control::exec_inside_zone::ExecInsideZoneRpc;
31use crate::control::get_host_cpu_topology::GetHostCpuTopologyRpc;
32use crate::control::get_host_status::GetHostStatusRpc;
33use crate::control::get_zone::GetZoneRpc;
34use crate::control::list_devices::ListDevicesRpc;
35use crate::control::list_zones::ListZonesRpc;
36use crate::control::pull_image::PullImageRpc;
37use crate::control::read_hypervisor_console::ReadHypervisorConsoleRpc;
38use crate::control::read_zone_metrics::ReadZoneMetricsRpc;
39use crate::control::resolve_zone_id::ResolveZoneIdRpc;
40use crate::control::set_host_power_management_policy::SetHostPowerManagementPolicyRpc;
41use crate::control::snoop_idm::SnoopIdmRpc;
42use crate::control::update_zone_resources::UpdateZoneResourcesRpc;
43use crate::control::watch_events::WatchEventsRpc;
44use crate::db::zone::ZoneStore;
45use crate::network::assignment::NetworkAssignment;
46use crate::{
47 console::DaemonConsoleHandle, devices::DaemonDeviceManager, event::DaemonEventContext,
48 idm::DaemonIdmHandle, zlt::ZoneLookupTable,
49};
50
51pub mod attach_zone_console;
52pub mod create_zone;
53pub mod destroy_zone;
54pub mod exec_inside_zone;
55pub mod get_host_cpu_topology;
56pub mod get_host_status;
57pub mod get_zone;
58pub mod list_devices;
59pub mod list_network_reservations;
60pub mod list_zones;
61pub mod pull_image;
62pub mod read_hypervisor_console;
63pub mod read_zone_metrics;
64pub mod resolve_zone_id;
65pub mod set_host_power_management_policy;
66pub mod snoop_idm;
67pub mod update_zone_resources;
68pub mod watch_events;
69
70pub struct ApiError {
71 message: String,
72}
73
74impl From<Error> for ApiError {
75 fn from(value: Error) -> Self {
76 ApiError {
77 message: value.to_string(),
78 }
79 }
80}
81
82impl From<ApiError> for Status {
83 fn from(value: ApiError) -> Self {
84 Status::unknown(value.message)
85 }
86}
87
88#[derive(Clone)]
89pub struct DaemonControlService {
90 zlt: ZoneLookupTable,
91 devices: DaemonDeviceManager,
92 events: DaemonEventContext,
93 console: DaemonConsoleHandle,
94 idm: DaemonIdmHandle,
95 zones: ZoneStore,
96 network: NetworkAssignment,
97 zone_reconciler_notify: Sender<Uuid>,
98 packer: OciPackerService,
99 runtime: Runtime,
100}
101
102impl DaemonControlService {
103 #[allow(clippy::too_many_arguments)]
104 pub fn new(
105 zlt: ZoneLookupTable,
106 devices: DaemonDeviceManager,
107 events: DaemonEventContext,
108 console: DaemonConsoleHandle,
109 idm: DaemonIdmHandle,
110 zones: ZoneStore,
111 network: NetworkAssignment,
112 zone_reconciler_notify: Sender<Uuid>,
113 packer: OciPackerService,
114 runtime: Runtime,
115 ) -> Self {
116 Self {
117 zlt,
118 devices,
119 events,
120 console,
121 idm,
122 zones,
123 network,
124 zone_reconciler_notify,
125 packer,
126 runtime,
127 }
128 }
129}
130
131#[tonic::async_trait]
132impl ControlService for DaemonControlService {
133 async fn get_host_status(
134 &self,
135 request: Request<GetHostStatusRequest>,
136 ) -> Result<Response<GetHostStatusReply>, Status> {
137 let request = request.into_inner();
138 adapt(
139 GetHostStatusRpc::new(self.network.clone(), self.zlt.clone())
140 .process(request)
141 .await,
142 )
143 }
144
145 type SnoopIdmStream =
146 Pin<Box<dyn Stream<Item = Result<SnoopIdmReply, Status>> + Send + 'static>>;
147
148 async fn snoop_idm(
149 &self,
150 request: Request<SnoopIdmRequest>,
151 ) -> Result<Response<Self::SnoopIdmStream>, Status> {
152 let request = request.into_inner();
153 adapt(
154 SnoopIdmRpc::new(self.idm.clone(), self.zlt.clone())
155 .process(request)
156 .await,
157 )
158 }
159
160 async fn get_host_cpu_topology(
161 &self,
162 request: Request<GetHostCpuTopologyRequest>,
163 ) -> Result<Response<GetHostCpuTopologyReply>, Status> {
164 let request = request.into_inner();
165 adapt(
166 GetHostCpuTopologyRpc::new(self.runtime.clone())
167 .process(request)
168 .await,
169 )
170 }
171
172 async fn set_host_power_management_policy(
173 &self,
174 request: Request<SetHostPowerManagementPolicyRequest>,
175 ) -> Result<Response<SetHostPowerManagementPolicyReply>, Status> {
176 let request = request.into_inner();
177 adapt(
178 SetHostPowerManagementPolicyRpc::new(self.runtime.clone())
179 .process(request)
180 .await,
181 )
182 }
183
184 async fn list_devices(
185 &self,
186 request: Request<ListDevicesRequest>,
187 ) -> Result<Response<ListDevicesReply>, Status> {
188 let request = request.into_inner();
189 adapt(
190 ListDevicesRpc::new(self.devices.clone())
191 .process(request)
192 .await,
193 )
194 }
195
196 async fn list_network_reservations(
197 &self,
198 request: Request<ListNetworkReservationsRequest>,
199 ) -> Result<Response<ListNetworkReservationsReply>, Status> {
200 let request = request.into_inner();
201 adapt(
202 ListNetworkReservationsRpc::new(self.network.clone())
203 .process(request)
204 .await,
205 )
206 }
207
208 type PullImageStream =
209 Pin<Box<dyn Stream<Item = Result<PullImageReply, Status>> + Send + 'static>>;
210
211 async fn pull_image(
212 &self,
213 request: Request<PullImageRequest>,
214 ) -> Result<Response<Self::PullImageStream>, Status> {
215 let request = request.into_inner();
216 adapt(
217 PullImageRpc::new(self.packer.clone())
218 .process(request)
219 .await,
220 )
221 }
222
223 async fn create_zone(
224 &self,
225 request: Request<CreateZoneRequest>,
226 ) -> Result<Response<CreateZoneReply>, Status> {
227 let request = request.into_inner();
228 adapt(
229 CreateZoneRpc::new(
230 self.zones.clone(),
231 self.zlt.clone(),
232 self.zone_reconciler_notify.clone(),
233 )
234 .process(request)
235 .await,
236 )
237 }
238
239 async fn destroy_zone(
240 &self,
241 request: Request<DestroyZoneRequest>,
242 ) -> Result<Response<DestroyZoneReply>, Status> {
243 let request = request.into_inner();
244 adapt(
245 DestroyZoneRpc::new(self.zones.clone(), self.zone_reconciler_notify.clone())
246 .process(request)
247 .await,
248 )
249 }
250
251 async fn resolve_zone_id(
252 &self,
253 request: Request<ResolveZoneIdRequest>,
254 ) -> Result<Response<ResolveZoneIdReply>, Status> {
255 let request = request.into_inner();
256 adapt(
257 ResolveZoneIdRpc::new(self.zones.clone())
258 .process(request)
259 .await,
260 )
261 }
262
263 async fn get_zone(
264 &self,
265 request: Request<GetZoneRequest>,
266 ) -> Result<Response<GetZoneReply>, Status> {
267 let request = request.into_inner();
268 adapt(GetZoneRpc::new(self.zones.clone()).process(request).await)
269 }
270
271 async fn update_zone_resources(
272 &self,
273 request: Request<UpdateZoneResourcesRequest>,
274 ) -> Result<Response<UpdateZoneResourcesReply>, Status> {
275 let request = request.into_inner();
276 adapt(
277 UpdateZoneResourcesRpc::new(self.runtime.clone(), self.zones.clone())
278 .process(request)
279 .await,
280 )
281 }
282
283 async fn list_zones(
284 &self,
285 request: Request<ListZonesRequest>,
286 ) -> Result<Response<ListZonesReply>, Status> {
287 let request = request.into_inner();
288 adapt(ListZonesRpc::new(self.zones.clone()).process(request).await)
289 }
290
291 type AttachZoneConsoleStream =
292 Pin<Box<dyn Stream<Item = Result<ZoneConsoleReply, Status>> + Send + 'static>>;
293
294 async fn attach_zone_console(
295 &self,
296 request: Request<Streaming<ZoneConsoleRequest>>,
297 ) -> Result<Response<Self::AttachZoneConsoleStream>, Status> {
298 let input = request.into_inner();
299 adapt(
300 AttachZoneConsoleRpc::new(self.console.clone())
301 .process(input)
302 .await,
303 )
304 }
305
306 type ExecInsideZoneStream =
307 Pin<Box<dyn Stream<Item = Result<ExecInsideZoneReply, Status>> + Send + 'static>>;
308
309 async fn exec_inside_zone(
310 &self,
311 request: Request<Streaming<ExecInsideZoneRequest>>,
312 ) -> Result<Response<Self::ExecInsideZoneStream>, Status> {
313 let input = request.into_inner();
314 adapt(
315 ExecInsideZoneRpc::new(self.idm.clone())
316 .process(input)
317 .await,
318 )
319 }
320
321 async fn read_zone_metrics(
322 &self,
323 request: Request<ReadZoneMetricsRequest>,
324 ) -> Result<Response<ReadZoneMetricsReply>, Status> {
325 let request = request.into_inner();
326 adapt(
327 ReadZoneMetricsRpc::new(self.idm.clone())
328 .process(request)
329 .await,
330 )
331 }
332
333 type WatchEventsStream =
334 Pin<Box<dyn Stream<Item = Result<WatchEventsReply, Status>> + Send + 'static>>;
335
336 async fn watch_events(
337 &self,
338 request: Request<WatchEventsRequest>,
339 ) -> Result<Response<Self::WatchEventsStream>, Status> {
340 let request = request.into_inner();
341 adapt(
342 WatchEventsRpc::new(self.events.clone())
343 .process(request)
344 .await,
345 )
346 }
347
348 async fn read_hypervisor_console(
349 &self,
350 request: Request<ReadHypervisorConsoleRequest>,
351 ) -> Result<Response<ReadHypervisorConsoleReply>, Status> {
352 let request = request.into_inner();
353 adapt(
354 ReadHypervisorConsoleRpc::new(self.runtime.clone())
355 .process(request)
356 .await,
357 )
358 }
359}
360
361fn adapt<T>(result: anyhow::Result<T>) -> Result<Response<T>, Status> {
362 result
363 .map(Response::new)
364 .map_err(|error| Status::unknown(error.to_string()))
365}