rust_control_plane/service/
common.rs1use crate::cache::{Cache, FetchError};
2use crate::service::stream::handle_stream;
3use data_plane_api::envoy::service::discovery::v3::{DiscoveryRequest, DiscoveryResponse};
4use futures::Stream;
5use std::pin::Pin;
6use std::sync::Arc;
7use tokio::sync::mpsc;
8use tokio_stream::wrappers::ReceiverStream;
9use tonic::{Request, Response};
10use tonic::{Status, Streaming};
11
12#[derive(Debug)]
13pub struct Service {
14 pub cache: Arc<Cache>,
15}
16
17pub type StreamResponse<T> = Pin<Box<dyn Stream<Item = Result<T, Status>> + Send + 'static>>;
18
19impl Service {
20 pub fn new(cache: Arc<Cache>) -> Self {
21 Self { cache }
22 }
23
24 pub fn stream(
25 &self,
26 req: Request<Streaming<DiscoveryRequest>>,
27 type_url: &'static str,
28 ) -> Result<Response<StreamResponse<DiscoveryResponse>>, Status> {
29 let input = req.into_inner();
30 let (tx, rx) = mpsc::channel(1);
31 let output = ReceiverStream::new(rx);
32 let cache_clone = self.cache.clone();
33 tokio::spawn(async move { handle_stream(input, tx, type_url, cache_clone).await });
34 Ok(Response::new(
35 Box::pin(output) as StreamResponse<DiscoveryResponse>
36 ))
37 }
38
39 pub async fn fetch(
40 &self,
41 req: &DiscoveryRequest,
42 type_url: &'static str,
43 ) -> Result<Response<DiscoveryResponse>, Status> {
44 match self.cache.fetch(req, type_url).await {
45 Ok(resp) => Ok(Response::new(resp)),
46 Err(FetchError::NotFound) => Err(Status::not_found("Resource not found for node")),
47 Err(FetchError::VersionUpToDate) => {
48 Err(Status::already_exists("Version already up to date"))
49 }
50 }
51 }
52}