kratad/control/
mod.rs

1use std::pin::Pin;
2
3use anyhow::Error;
4use futures::Stream;
5use list_network_reservations::ListNetworkReservationsRpc;
6use tokio::sync::mpsc::Sender;
7use tonic::{Request, Response, Status, Streaming};
8use uuid::Uuid;
9
10use krata::v1::control::{
11    control_service_server::ControlService, CreateZoneReply, CreateZoneRequest, DestroyZoneReply,
12    DestroyZoneRequest, ExecInsideZoneReply, ExecInsideZoneRequest, GetHostCpuTopologyReply,
13    GetHostCpuTopologyRequest, GetHostStatusReply, GetHostStatusRequest, ListDevicesReply,
14    ListDevicesRequest, ListZonesReply, ListZonesRequest, PullImageReply, PullImageRequest,
15    ReadHypervisorConsoleReply, ReadHypervisorConsoleRequest, ReadZoneMetricsReply,
16    ReadZoneMetricsRequest, ResolveZoneIdReply, ResolveZoneIdRequest, SnoopIdmReply,
17    SnoopIdmRequest, UpdateZoneResourcesReply, UpdateZoneResourcesRequest, WatchEventsReply,
18    WatchEventsRequest, ZoneConsoleReply, ZoneConsoleRequest,
19};
20use krata::v1::control::{
21    GetZoneReply, GetZoneRequest, ListNetworkReservationsReply, ListNetworkReservationsRequest,
22    SetHostPowerManagementPolicyReply, SetHostPowerManagementPolicyRequest,
23};
24use krataoci::packer::service::OciPackerService;
25use kratart::Runtime;
26
27use crate::control::attach_zone_console::AttachZoneConsoleRpc;
28use crate::control::create_zone::CreateZoneRpc;
29use crate::control::destroy_zone::DestroyZoneRpc;
30use crate::control::exec_inside_zone::ExecInsideZoneRpc;
31use crate::control::get_host_cpu_topology::GetHostCpuTopologyRpc;
32use crate::control::get_host_status::GetHostStatusRpc;
33use crate::control::get_zone::GetZoneRpc;
34use crate::control::list_devices::ListDevicesRpc;
35use crate::control::list_zones::ListZonesRpc;
36use crate::control::pull_image::PullImageRpc;
37use crate::control::read_hypervisor_console::ReadHypervisorConsoleRpc;
38use crate::control::read_zone_metrics::ReadZoneMetricsRpc;
39use crate::control::resolve_zone_id::ResolveZoneIdRpc;
40use crate::control::set_host_power_management_policy::SetHostPowerManagementPolicyRpc;
41use crate::control::snoop_idm::SnoopIdmRpc;
42use crate::control::update_zone_resources::UpdateZoneResourcesRpc;
43use crate::control::watch_events::WatchEventsRpc;
44use crate::db::zone::ZoneStore;
45use crate::network::assignment::NetworkAssignment;
46use crate::{
47    console::DaemonConsoleHandle, devices::DaemonDeviceManager, event::DaemonEventContext,
48    idm::DaemonIdmHandle, zlt::ZoneLookupTable,
49};
50
51pub mod attach_zone_console;
52pub mod create_zone;
53pub mod destroy_zone;
54pub mod exec_inside_zone;
55pub mod get_host_cpu_topology;
56pub mod get_host_status;
57pub mod get_zone;
58pub mod list_devices;
59pub mod list_network_reservations;
60pub mod list_zones;
61pub mod pull_image;
62pub mod read_hypervisor_console;
63pub mod read_zone_metrics;
64pub mod resolve_zone_id;
65pub mod set_host_power_management_policy;
66pub mod snoop_idm;
67pub mod update_zone_resources;
68pub mod watch_events;
69
70pub struct ApiError {
71    message: String,
72}
73
74impl From<Error> for ApiError {
75    fn from(value: Error) -> Self {
76        ApiError {
77            message: value.to_string(),
78        }
79    }
80}
81
82impl From<ApiError> for Status {
83    fn from(value: ApiError) -> Self {
84        Status::unknown(value.message)
85    }
86}
87
88#[derive(Clone)]
89pub struct DaemonControlService {
90    zlt: ZoneLookupTable,
91    devices: DaemonDeviceManager,
92    events: DaemonEventContext,
93    console: DaemonConsoleHandle,
94    idm: DaemonIdmHandle,
95    zones: ZoneStore,
96    network: NetworkAssignment,
97    zone_reconciler_notify: Sender<Uuid>,
98    packer: OciPackerService,
99    runtime: Runtime,
100}
101
102impl DaemonControlService {
103    #[allow(clippy::too_many_arguments)]
104    pub fn new(
105        zlt: ZoneLookupTable,
106        devices: DaemonDeviceManager,
107        events: DaemonEventContext,
108        console: DaemonConsoleHandle,
109        idm: DaemonIdmHandle,
110        zones: ZoneStore,
111        network: NetworkAssignment,
112        zone_reconciler_notify: Sender<Uuid>,
113        packer: OciPackerService,
114        runtime: Runtime,
115    ) -> Self {
116        Self {
117            zlt,
118            devices,
119            events,
120            console,
121            idm,
122            zones,
123            network,
124            zone_reconciler_notify,
125            packer,
126            runtime,
127        }
128    }
129}
130
131#[tonic::async_trait]
132impl ControlService for DaemonControlService {
133    async fn get_host_status(
134        &self,
135        request: Request<GetHostStatusRequest>,
136    ) -> Result<Response<GetHostStatusReply>, Status> {
137        let request = request.into_inner();
138        adapt(
139            GetHostStatusRpc::new(self.network.clone(), self.zlt.clone())
140                .process(request)
141                .await,
142        )
143    }
144
145    type SnoopIdmStream =
146        Pin<Box<dyn Stream<Item = Result<SnoopIdmReply, Status>> + Send + 'static>>;
147
148    async fn snoop_idm(
149        &self,
150        request: Request<SnoopIdmRequest>,
151    ) -> Result<Response<Self::SnoopIdmStream>, Status> {
152        let request = request.into_inner();
153        adapt(
154            SnoopIdmRpc::new(self.idm.clone(), self.zlt.clone())
155                .process(request)
156                .await,
157        )
158    }
159
160    async fn get_host_cpu_topology(
161        &self,
162        request: Request<GetHostCpuTopologyRequest>,
163    ) -> Result<Response<GetHostCpuTopologyReply>, Status> {
164        let request = request.into_inner();
165        adapt(
166            GetHostCpuTopologyRpc::new(self.runtime.clone())
167                .process(request)
168                .await,
169        )
170    }
171
172    async fn set_host_power_management_policy(
173        &self,
174        request: Request<SetHostPowerManagementPolicyRequest>,
175    ) -> Result<Response<SetHostPowerManagementPolicyReply>, Status> {
176        let request = request.into_inner();
177        adapt(
178            SetHostPowerManagementPolicyRpc::new(self.runtime.clone())
179                .process(request)
180                .await,
181        )
182    }
183
184    async fn list_devices(
185        &self,
186        request: Request<ListDevicesRequest>,
187    ) -> Result<Response<ListDevicesReply>, Status> {
188        let request = request.into_inner();
189        adapt(
190            ListDevicesRpc::new(self.devices.clone())
191                .process(request)
192                .await,
193        )
194    }
195
196    async fn list_network_reservations(
197        &self,
198        request: Request<ListNetworkReservationsRequest>,
199    ) -> Result<Response<ListNetworkReservationsReply>, Status> {
200        let request = request.into_inner();
201        adapt(
202            ListNetworkReservationsRpc::new(self.network.clone())
203                .process(request)
204                .await,
205        )
206    }
207
208    type PullImageStream =
209        Pin<Box<dyn Stream<Item = Result<PullImageReply, Status>> + Send + 'static>>;
210
211    async fn pull_image(
212        &self,
213        request: Request<PullImageRequest>,
214    ) -> Result<Response<Self::PullImageStream>, Status> {
215        let request = request.into_inner();
216        adapt(
217            PullImageRpc::new(self.packer.clone())
218                .process(request)
219                .await,
220        )
221    }
222
223    async fn create_zone(
224        &self,
225        request: Request<CreateZoneRequest>,
226    ) -> Result<Response<CreateZoneReply>, Status> {
227        let request = request.into_inner();
228        adapt(
229            CreateZoneRpc::new(
230                self.zones.clone(),
231                self.zlt.clone(),
232                self.zone_reconciler_notify.clone(),
233            )
234            .process(request)
235            .await,
236        )
237    }
238
239    async fn destroy_zone(
240        &self,
241        request: Request<DestroyZoneRequest>,
242    ) -> Result<Response<DestroyZoneReply>, Status> {
243        let request = request.into_inner();
244        adapt(
245            DestroyZoneRpc::new(self.zones.clone(), self.zone_reconciler_notify.clone())
246                .process(request)
247                .await,
248        )
249    }
250
251    async fn resolve_zone_id(
252        &self,
253        request: Request<ResolveZoneIdRequest>,
254    ) -> Result<Response<ResolveZoneIdReply>, Status> {
255        let request = request.into_inner();
256        adapt(
257            ResolveZoneIdRpc::new(self.zones.clone())
258                .process(request)
259                .await,
260        )
261    }
262
263    async fn get_zone(
264        &self,
265        request: Request<GetZoneRequest>,
266    ) -> Result<Response<GetZoneReply>, Status> {
267        let request = request.into_inner();
268        adapt(GetZoneRpc::new(self.zones.clone()).process(request).await)
269    }
270
271    async fn update_zone_resources(
272        &self,
273        request: Request<UpdateZoneResourcesRequest>,
274    ) -> Result<Response<UpdateZoneResourcesReply>, Status> {
275        let request = request.into_inner();
276        adapt(
277            UpdateZoneResourcesRpc::new(self.runtime.clone(), self.zones.clone())
278                .process(request)
279                .await,
280        )
281    }
282
283    async fn list_zones(
284        &self,
285        request: Request<ListZonesRequest>,
286    ) -> Result<Response<ListZonesReply>, Status> {
287        let request = request.into_inner();
288        adapt(ListZonesRpc::new(self.zones.clone()).process(request).await)
289    }
290
291    type AttachZoneConsoleStream =
292        Pin<Box<dyn Stream<Item = Result<ZoneConsoleReply, Status>> + Send + 'static>>;
293
294    async fn attach_zone_console(
295        &self,
296        request: Request<Streaming<ZoneConsoleRequest>>,
297    ) -> Result<Response<Self::AttachZoneConsoleStream>, Status> {
298        let input = request.into_inner();
299        adapt(
300            AttachZoneConsoleRpc::new(self.console.clone())
301                .process(input)
302                .await,
303        )
304    }
305
306    type ExecInsideZoneStream =
307        Pin<Box<dyn Stream<Item = Result<ExecInsideZoneReply, Status>> + Send + 'static>>;
308
309    async fn exec_inside_zone(
310        &self,
311        request: Request<Streaming<ExecInsideZoneRequest>>,
312    ) -> Result<Response<Self::ExecInsideZoneStream>, Status> {
313        let input = request.into_inner();
314        adapt(
315            ExecInsideZoneRpc::new(self.idm.clone())
316                .process(input)
317                .await,
318        )
319    }
320
321    async fn read_zone_metrics(
322        &self,
323        request: Request<ReadZoneMetricsRequest>,
324    ) -> Result<Response<ReadZoneMetricsReply>, Status> {
325        let request = request.into_inner();
326        adapt(
327            ReadZoneMetricsRpc::new(self.idm.clone())
328                .process(request)
329                .await,
330        )
331    }
332
333    type WatchEventsStream =
334        Pin<Box<dyn Stream<Item = Result<WatchEventsReply, Status>> + Send + 'static>>;
335
336    async fn watch_events(
337        &self,
338        request: Request<WatchEventsRequest>,
339    ) -> Result<Response<Self::WatchEventsStream>, Status> {
340        let request = request.into_inner();
341        adapt(
342            WatchEventsRpc::new(self.events.clone())
343                .process(request)
344                .await,
345        )
346    }
347
348    async fn read_hypervisor_console(
349        &self,
350        request: Request<ReadHypervisorConsoleRequest>,
351    ) -> Result<Response<ReadHypervisorConsoleReply>, Status> {
352        let request = request.into_inner();
353        adapt(
354            ReadHypervisorConsoleRpc::new(self.runtime.clone())
355                .process(request)
356                .await,
357        )
358    }
359}
360
361fn adapt<T>(result: anyhow::Result<T>) -> Result<Response<T>, Status> {
362    result
363        .map(Response::new)
364        .map_err(|error| Status::unknown(error.to_string()))
365}