1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
use std::sync::Arc;
use engula_journal::{grpc as grpc_journal, Error as JournalError, Journal};
use engula_storage::{grpc as grpc_storage, Error as StorageError, Storage};
use futures::StreamExt;
use super::{client::Client, proto::*};
use crate::{async_trait, Error, KernelUpdate, Result, ResultStream, Sequence};
pub(crate) const DEFAULT_STREAM: &str = "DEFAULT";
pub(crate) const DEFAULT_BUCKET: &str = "DEFAULT";
#[derive(Clone)]
pub struct Kernel {
client: Client,
journal: grpc_journal::Journal,
storage: grpc_storage::Storage,
}
impl Kernel {
pub async fn connect(addr: &str) -> Result<Kernel> {
let endpoint = format!("http://{}", addr);
let client = Client::connect(&endpoint).await?;
let resp = client.place_lookup(PlaceLookupRequest {}).await?;
let journal = grpc_journal::Journal::connect(&resp.journal_address).await?;
let storage = grpc_storage::Storage::connect(&resp.storage_address).await?;
Ok(Kernel {
client,
journal,
storage,
})
}
}
#[async_trait]
impl crate::Kernel for Kernel {
type Bucket = grpc_storage::Bucket;
type Stream = grpc_journal::Stream;
async fn stream(&self) -> Result<Self::Stream> {
match self.journal.stream(DEFAULT_STREAM).await {
Ok(stream) => Ok(stream),
Err(JournalError::NotFound(_)) => {
Ok(self.journal.create_stream(DEFAULT_STREAM).await?)
}
Err(e) => Err(e.into()),
}
}
async fn bucket(&self) -> Result<Self::Bucket> {
match self.storage.bucket(DEFAULT_BUCKET).await {
Ok(bucket) => Ok(bucket),
Err(StorageError::NotFound(_)) => {
Ok(self.storage.create_bucket(DEFAULT_BUCKET).await?)
}
Err(e) => Err(e.into()),
}
}
async fn apply_update(&self, update: KernelUpdate) -> Result<()> {
let input = ApplyUpdateRequest {
version_update: Some(update.update),
};
self.client.apply_update(input).await?;
Ok(())
}
async fn current_version(&self) -> Result<Arc<Version>> {
let input = CurrentVersionRequest {};
let resp = self.client.current_version(input).await?;
let version = resp
.version
.ok_or_else(|| Error::Internal("CurrentVersionResponse::version is none".into()))?;
Ok(Arc::new(version))
}
async fn version_updates(&self, sequence: Sequence) -> ResultStream<Arc<VersionUpdate>> {
let input = VersionUpdatesRequest { sequence };
match self.client.version_updates(input).await {
Ok(output) => Box::new(output.map(|result| match result {
Ok(resp) => Ok(Arc::new(resp.version_update.ok_or_else(|| {
Error::Internal("VersionUpdatesResponse::version_update is none".into())
})?)),
Err(status) => Err(status.into()),
})),
Err(e) => Box::new(futures::stream::once(futures::future::err(e))),
}
}
}