1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
mod client;
mod compose;
mod error;
mod kernel;
mod proto;
mod server;
pub use self::{
client::Client,
compose::{FileKernel, Kernel as ComposeKernel, MemKernel},
kernel::Kernel,
server::Server,
};
#[cfg(test)]
mod tests {
use futures::TryStreamExt;
use tokio::net::TcpListener;
use tokio_stream::wrappers::TcpListenerStream;
use super::{MemKernel, Server};
use crate::*;
async fn mock_journal_and_storage_server(
) -> std::result::Result<String, Box<dyn std::error::Error>> {
let listener = TcpListener::bind("127.0.0.1:0").await?;
let local_addr = listener.local_addr()?;
tokio::task::spawn(async move {
let journal = mem::Journal::default();
let storage = mem::Storage::default();
tonic::transport::Server::builder()
.add_service(engula_journal::grpc::Server::new(journal).into_service())
.add_service(engula_storage::grpc::Server::new(storage).into_service())
.serve_with_incoming(TcpListenerStream::new(listener))
.await
.unwrap();
});
Ok(local_addr.to_string())
}
#[tokio::test(flavor = "multi_thread")]
async fn test() -> std::result::Result<(), Box<dyn std::error::Error>> {
let address = mock_journal_and_storage_server().await?;
let listener = TcpListener::bind("127.0.0.1:0").await?;
let local_addr = listener.local_addr()?;
tokio::task::spawn(async move {
let kernel = MemKernel::open(&address, &address).await.unwrap();
let server = Server::new(&address, &address, kernel);
tonic::transport::Server::builder()
.add_service(server.into_service())
.serve_with_incoming(TcpListenerStream::new(listener))
.await
.unwrap();
});
let kernel = grpc::Kernel::connect(&local_addr.to_string()).await?;
let version = kernel.current_version().await?;
assert_eq!(version.sequence, 0);
assert_eq!(version.meta.len(), 0);
assert_eq!(version.objects.len(), 0);
let handle = {
let mut expect = VersionUpdate {
sequence: 1,
..Default::default()
};
expect.add_meta.insert("a".to_owned(), b"b".to_vec());
expect.remove_meta.push("b".to_owned());
expect.add_objects.push("a".to_owned());
expect.remove_objects.push("b".to_owned());
let mut version_updates = kernel.version_updates(0).await;
tokio::spawn(async move {
let update = version_updates.try_next().await.unwrap().unwrap();
assert_eq!(*update, expect);
})
};
let mut update = KernelUpdate::default();
update.add_meta("a", "b");
update.remove_meta("b");
update.add_object("a");
update.remove_object("b");
kernel.apply_update(update).await?;
handle.await.unwrap();
let new_version = kernel.current_version().await?;
assert_eq!(new_version.sequence, 1);
assert_eq!(new_version.meta.len(), 1);
assert_eq!(new_version.objects.len(), 1);
Ok(())
}
}