generic_device_plugin/
service.rs1use std::{pin::Pin, time::Duration};
2
3use tokio::{sync::mpsc, time::sleep};
4use tokio_stream::wrappers::ReceiverStream;
5use tonic::{codegen::tokio_stream::Stream, Request, Response, Status};
6use tracing::{error, info};
7
8use super::pb::{device_plugin_server::DevicePlugin, *};
9
10#[async_trait::async_trait]
11pub trait GenericDevicePlugin: 'static + Sync + Send + Default {
12 const PRE_START_REQUIRED: bool;
13 const GET_PREFERRED_ALLOCATION_AVAILABLE: bool;
14 const RESOURCE_NAME: &'static str;
15 const DEVICE_POLL_INTERVAL: Duration;
16
17 async fn get_devices() -> Result<Vec<Device>, Status>;
18
19 async fn container_allocate(
20 device_ids: Vec<String>,
21 ) -> Result<ContainerAllocateResponse, Status>;
22
23 async fn get_container_preferred_allocation(
24 available_device_ids: Vec<String>,
25 must_include_device_ids: Vec<String>,
26 allocation_size: i32,
27 ) -> Result<ContainerPreferredAllocationResponse, Status>;
28
29 async fn pre_start_container(device_ids: Vec<String>) -> Result<(), Status>;
30}
31
32#[async_trait::async_trait]
33impl<DP: GenericDevicePlugin> DevicePlugin for DP {
34 async fn get_device_plugin_options(
37 &self,
38 _request: Request<Empty>,
39 ) -> Result<Response<DevicePluginOptions>, Status> {
40 let resp = DevicePluginOptions {
41 pre_start_required: DP::PRE_START_REQUIRED,
42 get_preferred_allocation_available: DP::GET_PREFERRED_ALLOCATION_AVAILABLE,
43 };
44 return Ok(Response::new(resp));
45 }
46
47 type ListAndWatchStream =
49 Pin<Box<dyn Stream<Item = Result<ListAndWatchResponse, Status>> + Send>>;
50
51 async fn list_and_watch(
55 &self,
56 _request: Request<Empty>,
57 ) -> Result<Response<Self::ListAndWatchStream>, Status> {
58 let (tx, rx) = mpsc::channel(128);
59 tokio::spawn(async move {
60 let mut prev_devices = Err(Status::unknown(""));
61 loop {
62 if tx.is_closed() {
63 break;
64 }
65
66 let devices_resp = DP::get_devices().await;
67
68 if devices_resp.is_err() || devices_resp.as_ref().ok() != prev_devices.as_ref().ok()
70 {
71 prev_devices = devices_resp.clone();
72 match tx
73 .send(devices_resp.map(|x| ListAndWatchResponse { devices: x }))
74 .await
75 {
76 Ok(()) => match &prev_devices {
77 Ok(pd) => info!("found {} devices, new device list sent!", pd.len()),
78 Err(e) => error!("failed to get devices: {e}"),
79 },
80 Err(e) => {
81 error!("failed to send new device list: {e}");
82 break;
83 }
84 }
85 }
86 sleep(DP::DEVICE_POLL_INTERVAL).await;
87 }
88 });
89 Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
90 }
91
92 async fn get_preferred_allocation(
98 &self,
99 request: Request<PreferredAllocationRequest>,
100 ) -> Result<Response<PreferredAllocationResponse>, Status> {
101 let request = request.into_inner();
102 let mut container_responses = Vec::with_capacity(request.container_requests.len());
103 for req in request.container_requests {
104 container_responses.push(
105 DP::get_container_preferred_allocation(
106 req.available_device_i_ds,
107 req.must_include_device_i_ds,
108 req.allocation_size,
109 )
110 .await?,
111 );
112 }
113 return Ok(Response::new(PreferredAllocationResponse {
114 container_responses,
115 }));
116 }
117
118 async fn allocate(
122 &self,
123 request: Request<AllocateRequest>,
124 ) -> Result<Response<AllocateResponse>, Status> {
125 let request = request.into_inner();
126 let mut container_responses = Vec::with_capacity(request.container_requests.len());
127 for req in request.container_requests {
128 container_responses.push(DP::container_allocate(req.devices_ids).await?);
129 }
130 return Ok(Response::new(AllocateResponse {
131 container_responses,
132 }));
133 }
134
135 async fn pre_start_container(
139 &self,
140 request: Request<PreStartContainerRequest>,
141 ) -> Result<Response<PreStartContainerResponse>, Status> {
142 DP::pre_start_container(request.into_inner().devices_ids).await?;
143 return Ok(Response::new(PreStartContainerResponse {}));
144 }
145}