rust_control_plane/service/
common.rs

1use crate::cache::{Cache, FetchError};
2use crate::service::stream::handle_stream;
3use data_plane_api::envoy::service::discovery::v3::{DiscoveryRequest, DiscoveryResponse};
4use futures::Stream;
5use std::pin::Pin;
6use std::sync::Arc;
7use tokio::sync::mpsc;
8use tokio_stream::wrappers::ReceiverStream;
9use tonic::{Request, Response};
10use tonic::{Status, Streaming};
11
12#[derive(Debug)]
13pub struct Service {
14    pub cache: Arc<Cache>,
15}
16
17pub type StreamResponse<T> = Pin<Box<dyn Stream<Item = Result<T, Status>> + Send + 'static>>;
18
19impl Service {
20    pub fn new(cache: Arc<Cache>) -> Self {
21        Self { cache }
22    }
23
24    pub fn stream(
25        &self,
26        req: Request<Streaming<DiscoveryRequest>>,
27        type_url: &'static str,
28    ) -> Result<Response<StreamResponse<DiscoveryResponse>>, Status> {
29        let input = req.into_inner();
30        let (tx, rx) = mpsc::channel(1);
31        let output = ReceiverStream::new(rx);
32        let cache_clone = self.cache.clone();
33        tokio::spawn(async move { handle_stream(input, tx, type_url, cache_clone).await });
34        Ok(Response::new(
35            Box::pin(output) as StreamResponse<DiscoveryResponse>
36        ))
37    }
38
39    pub async fn fetch(
40        &self,
41        req: &DiscoveryRequest,
42        type_url: &'static str,
43    ) -> Result<Response<DiscoveryResponse>, Status> {
44        match self.cache.fetch(req, type_url).await {
45            Ok(resp) => Ok(Response::new(resp)),
46            Err(FetchError::NotFound) => Err(Status::not_found("Resource not found for node")),
47            Err(FetchError::VersionUpToDate) => {
48                Err(Status::already_exists("Version already up to date"))
49            }
50        }
51    }
52}