1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
use async_trait::async_trait;
use std::{fmt::Display, result::Result as StdResult, sync::Arc};
mod prometheus {
include!(concat!(env!("OUT_DIR"), "/prometheus.rs"));
}
pub use prometheus::*;
#[derive(Debug)]
pub enum Error {
SnappyEncode(snap::Error),
SnappyDecode(snap::Error),
ProtoDecode(prost::DecodeError),
}
pub type Result<T> = StdResult<T, Error>;
impl Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::SnappyEncode(_) => f.write_str("SnappyEncode"),
Self::SnappyDecode(_) => f.write_str("SnappyDecode"),
Self::ProtoDecode(_) => f.write_str("ProtoDecode"),
}
}
}
impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::SnappyEncode(e) => Some(e),
Self::SnappyDecode(e) => Some(e),
Self::ProtoDecode(e) => Some(e),
}
}
}
#[async_trait]
pub trait RemoteStorage: Sync {
type Err: Send;
type Context: Send + Sync;
async fn write(&self, ctx: Self::Context, req: WriteRequest) -> StdResult<(), Self::Err>;
async fn process_query(
&self,
ctx: &Self::Context,
q: Query,
) -> StdResult<QueryResult, Self::Err>;
async fn read(
&self,
ctx: Self::Context,
req: ReadRequest,
) -> StdResult<ReadResponse, Self::Err> {
let results = futures::future::join_all(
req.queries
.into_iter()
.map(|q| async { self.process_query(&ctx, q).await }),
)
.await
.into_iter()
.collect::<StdResult<Vec<_>, Self::Err>>()?;
Ok(ReadResponse { results })
}
}
pub type RemoteStorageRef<C, E> = Arc<dyn RemoteStorage<Err = E, Context = C> + Send + Sync>;