1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
//! Kitsune2 op store types.
use crate::{
BoxFut, DhtArc, K2Result, OpId, SpaceId, Timestamp, builder, config,
};
use bytes::Bytes;
use futures::future::BoxFuture;
use std::cmp::Ordering;
use std::sync::Arc;
pub(crate) mod proto {
include!("../proto/gen/kitsune2.op_store.rs");
}
pub use proto::Op;
/// An op with metadata.
///
/// This is the basic unit of data in the kitsune2 system.
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct MetaOp {
/// The id of the op.
pub op_id: OpId,
/// The actual op data.
pub op_data: Bytes,
}
impl From<Bytes> for Op {
fn from(value: Bytes) -> Self {
Self { data: value }
}
}
/// An op that has been stored by the Kitsune host.
///
/// This is the basic unit of data that the host is expected to store. Whether that storage is
/// persistent may depend on the use-case. While Kitsune is holding references by hash, the host
/// is expected to store the actual data.
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct StoredOp {
/// The id of the op.
pub op_id: OpId,
/// The creation timestamp of the op.
///
/// This must be the same for everyone who sees this op.
///
/// Note that this means any op implementation must include a consistent timestamp in the op
/// data so that it can be provided back to Kitsune.
pub created_at: Timestamp,
}
impl Ord for StoredOp {
fn cmp(&self, other: &Self) -> Ordering {
(&self.created_at, &self.op_id).cmp(&(&other.created_at, &other.op_id))
}
}
impl PartialOrd for StoredOp {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
/// The API that a kitsune2 host must implement to provide data persistence for kitsune2.
#[cfg_attr(any(test, feature = "mockall"), mockall::automock)]
pub trait OpStore: 'static + Send + Sync + std::fmt::Debug {
/// Process incoming ops.
///
/// Pass the incoming ops to the host for processing. The host is expected to store the ops
/// if it is able to process them.
fn process_incoming_ops(
&self,
op_list: Vec<Bytes>,
) -> BoxFuture<'_, K2Result<Vec<OpId>>>;
/// Retrieve a batch of ops from the host by time range.
///
/// This must be the timestamp of the op, not the time that we saw the op or chose to store it.
/// The returned ops must be ordered by timestamp, ascending.
///
/// # Returns
///
/// - The op ids that are stored in the given arc and time range. Where the start time is
/// inclusive and the end time is exclusive.
/// - The total size of the op data that is pointed to by the returned op ids.
fn retrieve_op_hashes_in_time_slice(
&self,
arc: DhtArc,
start: Timestamp,
end: Timestamp,
) -> BoxFuture<'_, K2Result<(Vec<OpId>, u32)>>;
/// Retrieve a list of ops by their op ids.
///
/// This should be used to get op data for ops.
fn retrieve_ops(
&self,
op_ids: Vec<OpId>,
) -> BoxFuture<'_, K2Result<Vec<MetaOp>>>;
/// Filter the passed in op ids by ops that exist in the store.
///
/// # Returns
///
/// The filtered list of op ids that do not exist in the store.
fn filter_out_existing_ops(
&self,
op_ids: Vec<OpId>,
) -> BoxFuture<'_, K2Result<Vec<OpId>>>;
/// Retrieve a size-bounded list of op ids that have been stored within the given `arc` since
/// `start`.
///
/// The `start` timestamp is used to retrieve ops by their `stored_at` timestamp rather than
/// their creation timestamp. This means that the `start` value can be used to page an op
/// store.
///
/// The `limit_bytes` applies to the size of the op data, not the size of the op ids. This can
/// be thought of as a "page size" for the op data. Where the size is the size of the data
/// rather than the number of ops.
///
/// If the limit is applied, then the timestamp of the last op id is returned.
/// Otherwise, the timestamp for when this operation started is returned.
/// Either way, the returned timestamp should be used as the `start` value for the next call
/// to this op store.
///
/// # Returns
///
/// - As many op ids as can be returned within the `limit_bytes` limit.
/// - The total size of the op data that is pointed to by the returned op ids.
/// - A new timestamp to be used for the next query.
fn retrieve_op_ids_bounded(
&self,
arc: DhtArc,
start: Timestamp,
limit_bytes: u32,
) -> BoxFuture<'_, K2Result<(Vec<OpId>, u32, Timestamp)>>;
/// Get the earliest op timestamp in the given arc.
///
/// If there are no ops in the arc, return None.
/// Otherwise, return the earliest `created_at` timestamp of the ops in the arc.
///
/// This is used to determine the earliest timestamp that the arc has seen. That is a lower
/// bound on the work that needs to be done when building a summary model of the DHT.
fn earliest_timestamp_in_arc(
&self,
arc: DhtArc,
) -> BoxFuture<'_, K2Result<Option<Timestamp>>>;
/// Store the combined hash of a time slice.
fn store_slice_hash(
&self,
arc: DhtArc,
slice_index: u64,
slice_hash: Bytes,
) -> BoxFuture<'_, K2Result<()>>;
/// Count the number of stored time slices.
fn slice_hash_count(&self, arc: DhtArc) -> BoxFuture<'_, K2Result<u64>>;
/// Retrieve the combined hash of a time slice.
///
/// If the slice is not found, return None.
/// If the time slice is present, then it must be identical to what was stored by Kitsune2
/// using [Self::store_slice_hash].
fn retrieve_slice_hash(
&self,
arc: DhtArc,
slice_index: u64,
) -> BoxFuture<'_, K2Result<Option<Bytes>>>;
/// Retrieve all slice hashes for a given arc.
fn retrieve_slice_hashes(
&self,
arc: DhtArc,
) -> BoxFuture<'_, K2Result<Vec<(u64, Bytes)>>>;
/// Return the total count of locally held ops.
fn query_total_op_count(&self) -> BoxFuture<'_, K2Result<u64>>;
}
/// Trait-object [OpStore].
pub type DynOpStore = Arc<dyn OpStore>;
/// A factory for constructing [OpStore] instances.
pub trait OpStoreFactory: 'static + Send + Sync + std::fmt::Debug {
/// Help the builder construct a default config from the chosen
/// module factories.
fn default_config(&self, config: &mut config::Config) -> K2Result<()>;
/// Validate configuration.
fn validate_config(&self, config: &config::Config) -> K2Result<()>;
/// Construct an op store instance.
fn create(
&self,
builder: Arc<builder::Builder>,
space_id: SpaceId,
) -> BoxFut<'static, K2Result<DynOpStore>>;
}
/// Trait-object [OpStoreFactory].
pub type DynOpStoreFactory = Arc<dyn OpStoreFactory>;
#[cfg(test)]
mod test {
use super::*;
use prost::Message;
#[test]
fn happy_meta_op_encode_decode() {
let op = Op::from(Bytes::from(vec![1; 128]));
let op_enc = op.encode_to_vec();
let op_dec = Op::decode(op_enc.as_slice()).unwrap();
assert_eq!(op_dec, op);
}
}