generic_device_plugin/
service.rs

1use std::{pin::Pin, time::Duration};
2
3use tokio::{sync::mpsc, time::sleep};
4use tokio_stream::wrappers::ReceiverStream;
5use tonic::{codegen::tokio_stream::Stream, Request, Response, Status};
6use tracing::{error, info};
7
8use super::pb::{device_plugin_server::DevicePlugin, *};
9
10#[async_trait::async_trait]
11pub trait GenericDevicePlugin: 'static + Sync + Send + Default {
12    const PRE_START_REQUIRED: bool;
13    const GET_PREFERRED_ALLOCATION_AVAILABLE: bool;
14    const RESOURCE_NAME: &'static str;
15    const DEVICE_POLL_INTERVAL: Duration;
16
17    async fn get_devices() -> Result<Vec<Device>, Status>;
18
19    async fn container_allocate(
20        device_ids: Vec<String>,
21    ) -> Result<ContainerAllocateResponse, Status>;
22
23    async fn get_container_preferred_allocation(
24        available_device_ids: Vec<String>,
25        must_include_device_ids: Vec<String>,
26        allocation_size: i32,
27    ) -> Result<ContainerPreferredAllocationResponse, Status>;
28
29    async fn pre_start_container(device_ids: Vec<String>) -> Result<(), Status>;
30}
31
32#[async_trait::async_trait]
33impl<DP: GenericDevicePlugin> DevicePlugin for DP {
34    /// GetDevicePluginOptions returns options to be communicated with Device
35    /// Manager
36    async fn get_device_plugin_options(
37        &self,
38        _request: Request<Empty>,
39    ) -> Result<Response<DevicePluginOptions>, Status> {
40        let resp = DevicePluginOptions {
41            pre_start_required: DP::PRE_START_REQUIRED,
42            get_preferred_allocation_available: DP::GET_PREFERRED_ALLOCATION_AVAILABLE,
43        };
44        return Ok(Response::new(resp));
45    }
46
47    /// Server streaming response type for the ListAndWatch method.
48    type ListAndWatchStream =
49        Pin<Box<dyn Stream<Item = Result<ListAndWatchResponse, Status>> + Send>>;
50
51    /// ListAndWatch returns a stream of List of Devices
52    /// Whenever a Device state change or a Device disappears, ListAndWatch
53    /// returns the new list
54    async fn list_and_watch(
55        &self,
56        _request: Request<Empty>,
57    ) -> Result<Response<Self::ListAndWatchStream>, Status> {
58        let (tx, rx) = mpsc::channel(128);
59        tokio::spawn(async move {
60            let mut prev_devices = Err(Status::unknown(""));
61            loop {
62                if tx.is_closed() {
63                    break;
64                }
65
66                let devices_resp = DP::get_devices().await;
67
68                // if error or changed
69                if devices_resp.is_err() || devices_resp.as_ref().ok() != prev_devices.as_ref().ok()
70                {
71                    prev_devices = devices_resp.clone();
72                    match tx
73                        .send(devices_resp.map(|x| ListAndWatchResponse { devices: x }))
74                        .await
75                    {
76                        Ok(()) => match &prev_devices {
77                            Ok(pd) => info!("found {} devices, new device list sent!", pd.len()),
78                            Err(e) => error!("failed to get devices: {e}"),
79                        },
80                        Err(e) => {
81                            error!("failed to send new device list: {e}");
82                            break;
83                        }
84                    }
85                }
86                sleep(DP::DEVICE_POLL_INTERVAL).await;
87            }
88        });
89        Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
90    }
91
92    /// GetPreferredAllocation returns a preferred set of devices to allocate
93    /// from a list of available ones. The resulting preferred allocation is not
94    /// guaranteed to be the allocation ultimately performed by the
95    /// devicemanager. It is only designed to help the devicemanager make a more
96    /// informed allocation decision when possible.
97    async fn get_preferred_allocation(
98        &self,
99        request: Request<PreferredAllocationRequest>,
100    ) -> Result<Response<PreferredAllocationResponse>, Status> {
101        let request = request.into_inner();
102        let mut container_responses = Vec::with_capacity(request.container_requests.len());
103        for req in request.container_requests {
104            container_responses.push(
105                DP::get_container_preferred_allocation(
106                    req.available_device_i_ds,
107                    req.must_include_device_i_ds,
108                    req.allocation_size,
109                )
110                .await?,
111            );
112        }
113        return Ok(Response::new(PreferredAllocationResponse {
114            container_responses,
115        }));
116    }
117
118    /// Allocate is called during container creation so that the Device
119    /// Plugin can run device specific operations and instruct Kubelet
120    /// of the steps to make the Device available in the container
121    async fn allocate(
122        &self,
123        request: Request<AllocateRequest>,
124    ) -> Result<Response<AllocateResponse>, Status> {
125        let request = request.into_inner();
126        let mut container_responses = Vec::with_capacity(request.container_requests.len());
127        for req in request.container_requests {
128            container_responses.push(DP::container_allocate(req.devices_ids).await?);
129        }
130        return Ok(Response::new(AllocateResponse {
131            container_responses,
132        }));
133    }
134
135    /// PreStartContainer is called, if indicated by Device Plugin during registeration phase,
136    /// before each container start. Device plugin can run device specific operations
137    /// such as resetting the device before making devices available to the container
138    async fn pre_start_container(
139        &self,
140        request: Request<PreStartContainerRequest>,
141    ) -> Result<Response<PreStartContainerResponse>, Status> {
142        DP::pre_start_container(request.into_inner().devices_ids).await?;
143        return Ok(Response::new(PreStartContainerResponse {}));
144    }
145}