1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
// Copyright 2021 The Engula Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use futures::StreamExt;
use tonic::{Request, Response, Status};

use super::{compose::Kernel as ComposeKernel, proto::*};
use crate::{manifest::Manifest, Kernel, KernelUpdate};

pub struct Server<M: Manifest> {
    journal_address: String,
    storage_address: String,
    kernel: ComposeKernel<M>,
}

impl<M: Manifest> Server<M> {
    pub fn new(journal_addr: &str, storage_addr: &str, kernel: ComposeKernel<M>) -> Self {
        Server {
            journal_address: journal_addr.to_owned(),
            storage_address: storage_addr.to_owned(),
            kernel,
        }
    }

    pub fn into_service(self) -> kernel_server::KernelServer<Self> {
        kernel_server::KernelServer::new(self)
    }
}

#[tonic::async_trait]
impl<M: Manifest> kernel_server::Kernel for Server<M> {
    type VersionUpdatesStream =
        Box<dyn futures::Stream<Item = Result<VersionUpdatesResponse, Status>> + Send + Unpin>;

    async fn apply_update(
        &self,
        request: Request<ApplyUpdateRequest>,
    ) -> Result<Response<ApplyUpdateResponse>, Status> {
        let input = request.into_inner();
        if let Some(update) = input.version_update {
            self.kernel.apply_update(KernelUpdate { update }).await?;
        }
        Ok(Response::new(ApplyUpdateResponse {}))
    }

    async fn current_version(
        &self,
        _request: Request<CurrentVersionRequest>,
    ) -> Result<Response<CurrentVersionResponse>, Status> {
        let version = self.kernel.current_version().await?;
        Ok(Response::new(CurrentVersionResponse {
            version: Some((*version).clone()),
        }))
    }

    async fn version_updates(
        &self,
        request: Request<VersionUpdatesRequest>,
    ) -> Result<Response<Self::VersionUpdatesStream>, Status> {
        let input = request.into_inner();
        let updates_stream = self.kernel.version_updates(input.sequence).await;
        Ok(Response::new(Box::new(updates_stream.map(
            |result| match result {
                Ok(version_update) => Ok(VersionUpdatesResponse {
                    version_update: Some((*version_update).clone()),
                }),
                Err(e) => Err(e.into()),
            },
        ))))
    }

    async fn place_lookup(
        &self,
        _request: Request<PlaceLookupRequest>,
    ) -> Result<Response<PlaceLookupResponse>, Status> {
        Ok(Response::new(PlaceLookupResponse {
            journal_address: self.journal_address.clone(),
            storage_address: self.storage_address.clone(),
        }))
    }
}