1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
use futures::StreamExt;
use tonic::{Request, Response, Status};
use super::{compose::Kernel as ComposeKernel, proto::*};
use crate::{manifest::Manifest, Kernel, KernelUpdate};
pub struct Server<M: Manifest> {
journal_address: String,
storage_address: String,
kernel: ComposeKernel<M>,
}
impl<M: Manifest> Server<M> {
pub fn new(journal_addr: &str, storage_addr: &str, kernel: ComposeKernel<M>) -> Self {
Server {
journal_address: journal_addr.to_owned(),
storage_address: storage_addr.to_owned(),
kernel,
}
}
pub fn into_service(self) -> kernel_server::KernelServer<Self> {
kernel_server::KernelServer::new(self)
}
}
#[tonic::async_trait]
impl<M: Manifest> kernel_server::Kernel for Server<M> {
type VersionUpdatesStream =
Box<dyn futures::Stream<Item = Result<VersionUpdatesResponse, Status>> + Send + Unpin>;
async fn apply_update(
&self,
request: Request<ApplyUpdateRequest>,
) -> Result<Response<ApplyUpdateResponse>, Status> {
let input = request.into_inner();
if let Some(update) = input.version_update {
self.kernel.apply_update(KernelUpdate { update }).await?;
}
Ok(Response::new(ApplyUpdateResponse {}))
}
async fn current_version(
&self,
_request: Request<CurrentVersionRequest>,
) -> Result<Response<CurrentVersionResponse>, Status> {
let version = self.kernel.current_version().await?;
Ok(Response::new(CurrentVersionResponse {
version: Some((*version).clone()),
}))
}
async fn version_updates(
&self,
request: Request<VersionUpdatesRequest>,
) -> Result<Response<Self::VersionUpdatesStream>, Status> {
let input = request.into_inner();
let updates_stream = self.kernel.version_updates(input.sequence).await;
Ok(Response::new(Box::new(updates_stream.map(
|result| match result {
Ok(version_update) => Ok(VersionUpdatesResponse {
version_update: Some((*version_update).clone()),
}),
Err(e) => Err(e.into()),
},
))))
}
async fn place_lookup(
&self,
_request: Request<PlaceLookupRequest>,
) -> Result<Response<PlaceLookupResponse>, Status> {
Ok(Response::new(PlaceLookupResponse {
journal_address: self.journal_address.clone(),
storage_address: self.storage_address.clone(),
}))
}
}