1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
mod error;
mod kernel;
mod manifest;
mod metadata;
pub mod file;
pub mod grpc;
mod local;
pub mod mem;
pub use async_trait::async_trait;
pub use engula_journal::{Event, Journal, Stream, Timestamp};
pub use engula_storage::{Bucket, Storage};
pub type ResultStream<T> = Box<dyn futures::Stream<Item = Result<T>> + Send + Unpin>;
pub use self::{
error::{Error, Result},
kernel::{Kernel, KernelUpdate},
metadata::{Sequence, Version, VersionUpdate},
};
#[cfg(test)]
mod tests {
use futures::TryStreamExt;
use crate::*;
#[tokio::test]
async fn kernel() -> Result<()> {
let tmp = tempfile::tempdir()?;
let kernel = mem::Kernel::open().await?;
test_kernel(kernel).await?;
let kernel = file::Kernel::open(tmp.path()).await?;
test_kernel(kernel).await?;
Ok(())
}
async fn test_kernel(kernel: impl Kernel) -> Result<()> {
let handle = {
let mut expect = VersionUpdate {
sequence: 1,
..Default::default()
};
expect.add_meta.insert("a".to_owned(), b"b".to_vec());
expect.remove_meta.push("b".to_owned());
expect.add_objects.push("a".to_owned());
expect.remove_objects.push("b".to_owned());
let mut version_updates = kernel.version_updates(0).await;
tokio::spawn(async move {
let update = version_updates.try_next().await.unwrap().unwrap();
assert_eq!(*update, expect);
})
};
let mut update = KernelUpdate::default();
update.add_meta("a", "b");
update.remove_meta("b");
update.add_object("a");
update.remove_object("b");
kernel.apply_update(update).await?;
handle.await.unwrap();
Ok(())
}
}